fix: milvus role cannot stop at initializing state (#45244)

issue: #45243

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-11-04 10:47:32 +08:00 committed by GitHub
parent e6be590b97
commit d320ccab99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -21,12 +21,14 @@ import (
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
"reflect"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/cockroachdb/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo" "github.com/samber/lo"
@ -50,10 +52,10 @@ import (
rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server" rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/tracer" "github.com/milvus-io/milvus/pkg/v2/tracer"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/expr" "github.com/milvus-io/milvus/pkg/v2/util/expr"
"github.com/milvus-io/milvus/pkg/v2/util/gc" "github.com/milvus-io/milvus/pkg/v2/util/gc"
"github.com/milvus-io/milvus/pkg/v2/util/generic"
"github.com/milvus-io/milvus/pkg/v2/util/logutil" "github.com/milvus-io/milvus/pkg/v2/util/logutil"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -109,35 +111,29 @@ func runComponent[T component](ctx context.Context,
runWg *sync.WaitGroup, runWg *sync.WaitGroup,
creator func(context.Context, dependency.Factory) (T, error), creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry), metricRegister func(*prometheus.Registry),
) component { ) *conc.Future[component] {
var role T
sign := make(chan struct{}) sign := make(chan struct{})
go func() { future := conc.Go(func() (component, error) {
factory := dependency.NewFactory(localMsg) factory := dependency.NewFactory(localMsg)
var err error var err error
role, err = creator(ctx, factory) role, err := creator(ctx, factory)
if err != nil { if err != nil {
panic(err) return nil, errors.Wrap(err, "create component failed")
} }
if err := role.Prepare(); err != nil { if err := role.Prepare(); err != nil {
panic(err) return nil, errors.Wrap(err, "prepare component failed")
} }
close(sign) close(sign)
if err := role.Run(); err != nil {
panic(err)
}
runWg.Done()
}()
<-sign
healthz.Register(role) healthz.Register(role)
metricRegister(Registry.GoRegistry) metricRegister(Registry.GoRegistry)
if generic.IsZero(role) { if err := role.Run(); err != nil {
return nil return nil, errors.Wrap(err, "run component failed")
} }
return role return role, nil
})
<-sign
return future
} }
// MilvusRoles decides which components are brought up with Milvus. // MilvusRoles decides which components are brought up with Milvus.
@ -177,18 +173,15 @@ func (mr *MilvusRoles) printLDPreLoad() {
} }
} }
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy) return runComponent(ctx, localMsg, wg, components.NewProxy, metrics.RegisterProxy)
} }
func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runMixCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord) return runComponent(ctx, localMsg, wg, components.NewMixCoord, metrics.RegisterMixCoord)
} }
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
// clear local storage // clear local storage
queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0) queryDataLocalPath := pathutil.GetPath(pathutil.RootCachePath, 0)
if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() { if !paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() {
@ -199,21 +192,73 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync
return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
} }
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode) return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode)
} }
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode) return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode)
} }
func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { func (mr *MilvusRoles) runCDC(ctx context.Context, localMsg bool, wg *sync.WaitGroup) *conc.Future[component] {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC) return runComponent(ctx, localMsg, wg, components.NewCDC, metrics.RegisterCDC)
} }
func (mr *MilvusRoles) waitForAllComponentsReady(cancel context.CancelFunc, componentFutureMap map[string]*conc.Future[component]) (map[string]component, error) {
roles := make([]string, 0, len(componentFutureMap))
futures := make([]*conc.Future[component], 0, len(componentFutureMap))
for role, future := range componentFutureMap {
roles = append(roles, role)
futures = append(futures, future)
}
selectCases := make([]reflect.SelectCase, 1+len(componentFutureMap))
selectCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(mr.closed),
}
for i, future := range futures {
selectCases[i+1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(future.Inner()),
}
}
componentMap := make(map[string]component, len(componentFutureMap))
readyCount := 0
var gerr error
for {
index, _, _ := reflect.Select(selectCases)
if index == 0 {
cancel()
log.Warn("components are not ready before closing, wait for the start of components to be canceled...")
} else {
role := roles[index-1]
component, err := futures[index-1].Await()
readyCount++
if err != nil {
if gerr == nil {
gerr = errors.Wrapf(err, "component %s is not ready before closing", role)
cancel()
}
log.Warn("component is not ready before closing", zap.String("role", role), zap.Error(err))
} else {
componentMap[role] = component
log.Info("component is ready", zap.String("role", role))
}
}
selectCases[index] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(nil),
}
if readyCount == len(componentFutureMap) {
break
}
}
if gerr != nil {
return nil, errors.Wrap(gerr, "failed to wait for all components ready")
}
return componentMap, nil
}
func (mr *MilvusRoles) setupLogger() { func (mr *MilvusRoles) setupLogger() {
params := paramtable.Get() params := paramtable.Get()
logConfig := log.Config{ logConfig := log.Config{
@ -406,48 +451,51 @@ func (mr *MilvusRoles) Run() {
var wg sync.WaitGroup var wg sync.WaitGroup
local := mr.Local local := mr.Local
componentMap := make(map[string]component) componentFutureMap := make(map[string]*conc.Future[component])
var mixCoord component
var proxy, dataNode, queryNode, streamingNode, cdc component
if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord { if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord {
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole) paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
mixCoord = mr.runMixCoord(ctx, local, &wg) mixCoord := mr.runMixCoord(ctx, local, &wg)
componentMap[typeutil.MixCoordRole] = mixCoord componentFutureMap[typeutil.MixCoordRole] = mixCoord
} }
if mr.EnableQueryNode { if mr.EnableQueryNode {
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole) paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
queryNode = mr.runQueryNode(ctx, local, &wg) queryNode := mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode componentFutureMap[typeutil.QueryNodeRole] = queryNode
} }
if mr.EnableDataNode { if mr.EnableDataNode {
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole) paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
dataNode = mr.runDataNode(ctx, local, &wg) dataNode := mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode componentFutureMap[typeutil.DataNodeRole] = dataNode
} }
if mr.EnableProxy { if mr.EnableProxy {
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole) paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
proxy = mr.runProxy(ctx, local, &wg) proxy := mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy componentFutureMap[typeutil.ProxyRole] = proxy
} }
if mr.EnableStreamingNode { if mr.EnableStreamingNode {
// Before initializing the local streaming node, make sure the local registry is ready. // Before initializing the local streaming node, make sure the local registry is ready.
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole) paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
streamingNode = mr.runStreamingNode(ctx, local, &wg) streamingNode := mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode componentFutureMap[typeutil.StreamingNodeRole] = streamingNode
} }
if mr.EnableCDC { if mr.EnableCDC {
paramtable.SetLocalComponentEnabled(typeutil.CDCRole) paramtable.SetLocalComponentEnabled(typeutil.CDCRole)
cdc = mr.runCDC(ctx, local, &wg) cdc := mr.runCDC(ctx, local, &wg)
componentMap[typeutil.CDCRole] = cdc componentFutureMap[typeutil.CDCRole] = cdc
} }
wg.Wait() componentMap, err := mr.waitForAllComponentsReady(cancel, componentFutureMap)
if err != nil {
log.Warn("Failed to wait for all components ready", zap.Error(err))
return
}
log.Info("All components are ready", zap.Strings("roles", lo.Keys(componentMap)))
http.RegisterStopComponent(func(role string) error { http.RegisterStopComponent(func(role string) error {
if len(role) == 0 || componentMap[role] == nil { if len(role) == 0 || componentMap[role] == nil {
@ -505,6 +553,13 @@ func (mr *MilvusRoles) Run() {
<-mr.closed <-mr.closed
mixCoord := componentMap[typeutil.MixCoordRole]
streamingNode := componentMap[typeutil.StreamingNodeRole]
queryNode := componentMap[typeutil.QueryNodeRole]
dataNode := componentMap[typeutil.DataNodeRole]
cdc := componentMap[typeutil.CDCRole]
proxy := componentMap[typeutil.ProxyRole]
// stop coordinators first // stop coordinators first
coordinators := []component{mixCoord} coordinators := []component{mixCoord}
for idx, coord := range coordinators { for idx, coord := range coordinators {