fix: milvus fast fail if any component is not ready (#46070)

issue: #45243
pr: #46069

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-12-04 15:17:15 +08:00 committed by GitHub
parent d5eb240f3b
commit 397c3edd06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -108,7 +108,6 @@ func cleanLocalDir(path string) {
func runComponent[T component](ctx context.Context, func runComponent[T component](ctx context.Context,
localMsg bool, localMsg bool,
runWg *sync.WaitGroup,
creator func(context.Context, dependency.Factory) (T, error), creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry), metricRegister func(*prometheus.Registry),
) *conc.Future[component] { ) *conc.Future[component] {
@ -184,15 +183,15 @@ func (mr *MilvusRoles) printLDPreLoad() {
} }
} }
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy) return runComponent(ctx, localMsg, components.NewProxy, metrics.RegisterProxy)
} }
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord) return runComponent(ctx, localMsg, components.NewMixCoord, metrics.RegisterMixCoord)
} }
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *conc.Future[component] {
// clear local storage // clear local storage
queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0) queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0)
if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() {
@ -200,21 +199,24 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
// under posix mode, this clean task will be done by mixcoord // under posix mode, this clean task will be done by mixcoord
cleanLocalDir(queryDataLocalPath) cleanLocalDir(queryDataLocalPath)
} }
return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) return runComponent(ctx, localMsg, components.NewQueryNode, metrics.RegisterQueryNode)
} }
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode) return runComponent(ctx, localMsg, components.NewStreamingNode, metrics.RegisterStreamingNode)
} }
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) return runComponent(ctx, localMsg, components.NewDataNode, metrics.RegisterDataNode)
} }
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool) *conc.Future[component] {
return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC) return runComponent(ctx, localMsg, components.NewCDC, metrics.RegisterCDC)
} }
// waitForAllComponentsReady waits for all components to be ready.
// It will return an error if any component is not ready before closing with a fast fail strategy.
// It will return a map of components that are ready.
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) { func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
roles := make([]string, 0, len(componentFutureMap)) roles := make([]string, 0, len(componentFutureMap))
futures := make([]*conc.Future[component], 0, len(componentFutureMap)) futures := make([]*conc.Future[component], 0, len(componentFutureMap))
@ -235,22 +237,20 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
} }
componentMap := make(map[string]component, len(componentFutureMap)) componentMap := make(map[string]component, len(componentFutureMap))
readyCount := 0 readyCount := 0
var gerr error
for { for {
index, _, _ := reflect.Select(selectCases) index, _, _ := reflect.Select(selectCases)
if index == 0 { if index == 0 {
cancel() cancel()
log.Warn("components are not ready before closing, wait for the start of components to be canceled...") log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
return nil, context.Canceled
} else { } else {
role := roles[index-1] role := roles[index-1]
component, err := futures[index-1].Await() component, err := futures[index-1].Await()
readyCount++ readyCount++
if err != nil { if err != nil {
if gerr == nil {
gerr = errors.Wrapf(err, "component %s is not ready before closing", role)
cancel() cancel()
}
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err)) log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
return nil, err
} else { } else {
componentMap[role] = component componentMap[role] = component
log.Info("component is ready", zap.String("role", role)) log.Info("component is ready", zap.String("role", role))
@ -264,9 +264,6 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp
break break
} }
} }
if gerr != nil {
return nil, errors.Wrap(gerr, "failed to wait for all components ready")
}
return componentMap, nil return componentMap, nil
} }
@ -468,45 +465,43 @@ func (mr *MilvusRoles) Run() {
defer streaming.Release() defer streaming.Release()
} }
var wg sync.WaitGroup
local := mr.Local local := mr.Local
componentFutureMap := make(map[string]*conc.Future[component]) componentFutureMap := make(map[string]*conc.Future[component])
if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord { if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord {
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole) paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
mixCoord := mr.runMixCoord(ctx, local, &wg) mixCoord := mr.runMixCoord(ctx, local)
componentFutureMap[typeutil.MixCoordRole] = mixCoord componentFutureMap[typeutil.MixCoordRole] = mixCoord
} }
if mr.EnableQueryNode { if mr.EnableQueryNode {
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole) paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
queryNode := mr.runQueryNode(ctx, local, &wg) queryNode := mr.runQueryNode(ctx, local)
componentFutureMap[typeutil.QueryNodeRole] = queryNode componentFutureMap[typeutil.QueryNodeRole] = queryNode
} }
if mr.EnableDataNode { if mr.EnableDataNode {
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole) paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
dataNode := mr.runDataNode(ctx, local, &wg) dataNode := mr.runDataNode(ctx, local)
componentFutureMap[typeutil.DataNodeRole] = dataNode componentFutureMap[typeutil.DataNodeRole] = dataNode
} }
if mr.EnableProxy { if mr.EnableProxy {
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole) paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
proxy := mr.runProxy(ctx, local, &wg) proxy := mr.runProxy(ctx, local)
componentFutureMap[typeutil.ProxyRole] = proxy componentFutureMap[typeutil.ProxyRole] = proxy
} }
if mr.EnableStreamingNode { if mr.EnableStreamingNode {
// Before initializing the local streaming node, make sure the local registry is ready. // Before initializing the local streaming node, make sure the local registry is ready.
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole) paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
streamingNode := mr.runStreamingNode(ctx, local, &wg) streamingNode := mr.runStreamingNode(ctx, local)
componentFutureMap[typeutil.StreamingNodeRole] = streamingNode componentFutureMap[typeutil.StreamingNodeRole] = streamingNode
} }
if mr.EnableCDC { if mr.EnableCDC {
paramtable.SetLocalComponentEnabled(typeutil.CDCRole) paramtable.SetLocalComponentEnabled(typeutil.CDCRole)
cdc := mr.runCDC(ctx, local, &wg) cdc := mr.runCDC(ctx, local)
componentFutureMap[typeutil.CDCRole] = cdc componentFutureMap[typeutil.CDCRole] = cdc
} }