diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 798aa20ad2..577b8fec0e 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -108,7 +108,6 @@ func cleanLocalDir(path string) { func runComponent[T component](ctx context.Context, localMsg bool, - runWg *sync.WaitGroup, creator func(context.Context, dependency.Factory) (T, error), metricRegister func(*prometheus.Registry), ) *conc.Future[component] { @@ -184,15 +183,15 @@ func (mr *MilvusRoles) printLDPreLoad() { } } -func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { - return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy) +func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *conc.Future[component] { + return runComponent(ctx, localMsg, components.NewProxy, metrics.RegisterProxy) } -func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { - return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord) +func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool) *conc.Future[component] { + return runComponent(ctx, localMsg, components.NewMixCoord, metrics.RegisterMixCoord) } -func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { +func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *conc.Future[component] { // clear local storage queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0) if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { @@ -200,21 +199,24 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync // under posix mode, this clean task will be done by mixcoord cleanLocalDir(queryDataLocalPath) } - return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) + return runComponent(ctx, localMsg, components.NewQueryNode, metrics.RegisterQueryNode) } -func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { - return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode) +func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool) *conc.Future[component] { + return runComponent(ctx, localMsg, components.NewStreamingNode, metrics.RegisterStreamingNode) } -func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { - return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) +func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *conc.Future[component] { + return runComponent(ctx, localMsg, components.NewDataNode, metrics.RegisterDataNode) } -func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { - return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC) +func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool) *conc.Future[component] { + return runComponent(ctx, localMsg, components.NewCDC, metrics.RegisterCDC) } +// waitForAllComponentsReady waits for all components to be ready. +// It will return an error if any component is not ready before closing with a fast fail strategy. +// It will return a map of components that are ready. func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) { roles := make([]string, 0, len(componentFutureMap)) futures := make([]*conc.Future[component], 0, len(componentFutureMap)) @@ -235,22 +237,20 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp } componentMap := make(map[string]component, len(componentFutureMap)) readyCount := 0 - var gerr error for { index, _, _ := reflect.Select(selectCases) if index == 0 { cancel() log.Warn("components are not ready before closing, wait for the start of components to be canceled...") + return nil, context.Canceled } else { role := roles[index-1] component, err := futures[index-1].Await() readyCount++ if err != nil { - if gerr == nil { - gerr = errors.Wrapf(err, "component %s is not ready before closing", role) - cancel() - } + cancel() log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err)) + return nil, err } else { componentMap[role] = component log.Info("component is ready", zap.String("role", role)) @@ -264,9 +264,6 @@ func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, comp break } } - if gerr != nil { - return nil, errors.Wrap(gerr, "failed to wait for all components ready") - } return componentMap, nil } @@ -468,45 +465,43 @@ func (mr *MilvusRoles) Run() { defer streaming.Release() } - var wg sync.WaitGroup local := mr.Local - componentFutureMap := make(map[string]*conc.Future[component]) if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord { paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole) - mixCoord := mr.runMixCoord(ctx, local, &wg) + mixCoord := mr.runMixCoord(ctx, local) componentFutureMap[typeutil.MixCoordRole] = mixCoord } if mr.EnableQueryNode { paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole) - queryNode := mr.runQueryNode(ctx, local, &wg) + queryNode := mr.runQueryNode(ctx, local) componentFutureMap[typeutil.QueryNodeRole] = queryNode } if mr.EnableDataNode { paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole) - dataNode := mr.runDataNode(ctx, local, &wg) + dataNode := mr.runDataNode(ctx, local) componentFutureMap[typeutil.DataNodeRole] = dataNode } if mr.EnableProxy { paramtable.SetLocalComponentEnabled(typeutil.ProxyRole) - proxy := mr.runProxy(ctx, local, &wg) + proxy := mr.runProxy(ctx, local) componentFutureMap[typeutil.ProxyRole] = proxy } if mr.EnableStreamingNode { // Before initializing the local streaming node, make sure the local registry is ready. paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole) - streamingNode := mr.runStreamingNode(ctx, local, &wg) + streamingNode := mr.runStreamingNode(ctx, local) componentFutureMap[typeutil.StreamingNodeRole] = streamingNode } if mr.EnableCDC { paramtable.SetLocalComponentEnabled(typeutil.CDCRole) - cdc := mr.runCDC(ctx, local, &wg) + cdc := mr.runCDC(ctx, local) componentFutureMap[typeutil.CDCRole] = cdc }