mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
fix: milvus fast fail if any component is not ready (#46115)
issue: #45243 pr: #46069 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
571295e945
commit
613cd5cb85
@ -247,6 +247,9 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool) *conc.Fu
|
|||||||
return runComponent(ctx, localMsg, components.NewIndexNode, metrics.RegisterIndexNode)
|
return runComponent(ctx, localMsg, components.NewIndexNode, metrics.RegisterIndexNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForAllComponentsReady waits for all components to be ready.
|
||||||
|
// It will return an error if any component is not ready before closing with a fast fail strategy.
|
||||||
|
// It will return a map of components that are ready.
|
||||||
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
|
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
|
||||||
roles := make([]string, 0, len(componentFutureMap))
|
roles := make([]string, 0, len(componentFutureMap))
|
||||||
futures := make([]*conc.Future[component], 0, len(componentFutureMap))
|
futures := make([]*conc.Future[component], 0, len(componentFutureMap))
|
||||||
@ -267,22 +270,20 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
|
|||||||
}
|
}
|
||||||
componentMap := make(map[string]component, len(componentFutureMap))
|
componentMap := make(map[string]component, len(componentFutureMap))
|
||||||
readyCount := 0
|
readyCount := 0
|
||||||
var gerr error
|
|
||||||
for {
|
for {
|
||||||
index, _, _ := reflect.Select(selectCases)
|
index, _, _ := reflect.Select(selectCases)
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
cancel()
|
cancel()
|
||||||
log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
|
log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
|
||||||
|
return nil, context.Canceled
|
||||||
} else {
|
} else {
|
||||||
role := roles[index-1]
|
role := roles[index-1]
|
||||||
component, err := futures[index-1].Await()
|
component, err := futures[index-1].Await()
|
||||||
readyCount++
|
readyCount++
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if gerr == nil {
|
cancel()
|
||||||
gerr = errors.Wrapf(err, "component %s is not ready before closing", role)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
|
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
|
||||||
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
componentMap[role] = component
|
componentMap[role] = component
|
||||||
log.Info("component is ready", zap.String("role", role))
|
log.Info("component is ready", zap.String("role", role))
|
||||||
@ -296,9 +297,6 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if gerr != nil {
|
|
||||||
return nil, errors.Wrap(gerr, "failed to wait for all components ready")
|
|
||||||
}
|
|
||||||
return componentMap, nil
|
return componentMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user