diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 0f1a54cede..b288e9e36a 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -21,12 +21,14 @@ import ( "fmt" "os" "os/signal" + "reflect" "runtime/debug" "strings" "sync" "syscall" "time" + "github.com/cockroachdb/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/samber/lo" @@ -50,10 +52,10 @@ import ( rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/tracer" + "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/expr" "github.com/milvus-io/milvus/pkg/v2/util/gc" - "github.com/milvus-io/milvus/pkg/v2/util/generic" "github.com/milvus-io/milvus/pkg/v2/util/logutil" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -109,35 +111,29 @@ func runComponent[T component](ctx context.Context, runWg *sync.WaitGroup, creator func(context.Context, dependency.Factory) (T, error), metricRegister func(*prometheus.Registry), -) component { - var role T - +) *conc.Future[component] { sign := make(chan struct{}) - go func() { + future := conc.Go(func() (component, error) { factory := dependency.NewFactory(localMsg) var err error - role, err = creator(ctx, factory) + role, err := creator(ctx, factory) if err != nil { - panic(err) + return nil, errors.Wrap(err, "create component failed") } if err := role.Prepare(); err != nil { - panic(err) + return nil, errors.Wrap(err, "prepare component failed") } close(sign) + healthz.Register(role) + metricRegister(Registry.GoRegistry) if err := role.Run(); err != nil { - panic(err) + return nil, errors.Wrap(err, "run component failed") } - runWg.Done() - }() + return role, nil + }) <-sign - - healthz.Register(role) - metricRegister(Registry.GoRegistry) - if generic.IsZero(role) { - return nil - } - return role + return future } // MilvusRoles decides which components are brought up with Milvus. @@ -177,18 +173,15 @@ func (mr *MilvusRoles) printLDPreLoad() { } } -func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy) } -func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord) } -func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { // clear local storage queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0) if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { @@ -199,21 +192,73 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) } -func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode) } -func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) } -func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { - wg.Add(1) +func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] { return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC) } +func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) { + roles := make([]string, 0, len(componentFutureMap)) + futures := make([]*conc.Future[component], 0, len(componentFutureMap)) + for role, future := range componentFutureMap { + roles = append(roles, role) + futures = append(futures, future) + } + selectCases := make([]reflect.SelectCase, 1+len(componentFutureMap)) + selectCases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(mr.closed), + } + for i, future := range futures { + selectCases[i+1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(future.Inner()), + } + } + componentMap := make(map[string]component, len(componentFutureMap)) + readyCount := 0 + var gerr error + for { + index, _, _ := reflect.Select(selectCases) + if index == 0 { + cancel() + log.Warn("components are not ready before closing, wait for the start of components to be canceled...") + } else { + role := roles[index-1] + component, err := futures[index-1].Await() + readyCount++ + if err != nil { + if gerr == nil { + gerr = errors.Wrapf(err, "component %s is not ready before closing", role) + cancel() + } + log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err)) + } else { + componentMap[role] = component + log.Info("component is ready", zap.String("role", role)) + } + } + selectCases[index] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(nil), + } + if readyCount == len(componentFutureMap) { + break + } + } + if gerr != nil { + return nil, errors.Wrap(gerr, "failed to wait for all components ready") + } + return componentMap, nil +} + func (mr *MilvusRoles) setupLogger() { params := paramtable.Get() logConfig := log.Config{ @@ -406,48 +451,51 @@ func (mr *MilvusRoles) Run() { var wg sync.WaitGroup local := mr.Local - componentMap := make(map[string]component) - var mixCoord component - var proxy, dataNode, queryNode, streamingNode, cdc component + componentFutureMap := make(map[string]*conc.Future[component]) if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord { paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole) - mixCoord = mr.runMixCoord(ctx, local, &wg) - componentMap[typeutil.MixCoordRole] = mixCoord + mixCoord := mr.runMixCoord(ctx, local, &wg) + componentFutureMap[typeutil.MixCoordRole] = mixCoord } if mr.EnableQueryNode { paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole) - queryNode = mr.runQueryNode(ctx, local, &wg) - componentMap[typeutil.QueryNodeRole] = queryNode + queryNode := mr.runQueryNode(ctx, local, &wg) + componentFutureMap[typeutil.QueryNodeRole] = queryNode } if mr.EnableDataNode { paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole) - dataNode = mr.runDataNode(ctx, local, &wg) - componentMap[typeutil.DataNodeRole] = dataNode + dataNode := mr.runDataNode(ctx, local, &wg) + componentFutureMap[typeutil.DataNodeRole] = dataNode } if mr.EnableProxy { paramtable.SetLocalComponentEnabled(typeutil.ProxyRole) - proxy = mr.runProxy(ctx, local, &wg) - componentMap[typeutil.ProxyRole] = proxy + proxy := mr.runProxy(ctx, local, &wg) + componentFutureMap[typeutil.ProxyRole] = proxy } if mr.EnableStreamingNode { // Before initializing the local streaming node, make sure the local registry is ready. paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole) - streamingNode = mr.runStreamingNode(ctx, local, &wg) - componentMap[typeutil.StreamingNodeRole] = streamingNode + streamingNode := mr.runStreamingNode(ctx, local, &wg) + componentFutureMap[typeutil.StreamingNodeRole] = streamingNode } if mr.EnableCDC { paramtable.SetLocalComponentEnabled(typeutil.CDCRole) - cdc = mr.runCDC(ctx, local, &wg) - componentMap[typeutil.CDCRole] = cdc + cdc := mr.runCDC(ctx, local, &wg) + componentFutureMap[typeutil.CDCRole] = cdc } - wg.Wait() + componentMap, err := mr.waitForAllComponentsReady(cancel, componentFutureMap) + if err != nil { + log.Warn("Failed to wait for all components ready", zap.Error(err)) + return + } + log.Info("All components are ready", zap.Strings("roles", lo.Keys(componentMap))) http.RegisterStopComponent(func(role string) error { if len(role) == 0 || componentMap[role] == nil { @@ -505,6 +553,13 @@ func (mr *MilvusRoles) Run() { <-mr.closed + mixCoord := componentMap[typeutil.MixCoordRole] + streamingNode := componentMap[typeutil.StreamingNodeRole] + queryNode := componentMap[typeutil.QueryNodeRole] + dataNode := componentMap[typeutil.DataNodeRole] + cdc := componentMap[typeutil.CDCRole] + proxy := componentMap[typeutil.ProxyRole] + // stop coordinators first coordinators := []component{mixCoord} for idx, coord := range coordinators {