diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4e3294feed..659ce08e59 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -371,7 +371,7 @@ queryCoord: channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode collectionObserverInterval: 200 # the interval of collection observer checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist - updateCollectionLoadStatusInterval: 5 # 5m, max interval for updating collection loaded status + updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address port: 19531 # TCP port of queryCoord diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index 37164ae979..8a61177baa 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -53,11 +53,11 @@ type MockManager_AllocSegment_Call struct { } // AllocSegment is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -// - channelName string -// - requestRows int64 +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 +// - channelName string +// - requestRows int64 func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call { return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)} } @@ -90,8 +90,8 @@ type MockManager_DropSegment_Call struct { } // DropSegment is a helper method to define mock.On call -// - ctx context.Context -// - segmentID int64 +// - ctx context.Context +// - segmentID int64 func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call { return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)} } @@ -124,8 +124,8 @@ type MockManager_DropSegmentsOfChannel_Call struct { } // DropSegmentsOfChannel is a helper method to define mock.On call -// - ctx context.Context -// - channel string +// - ctx context.Context +// - channel string func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call { return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)} } @@ -167,8 +167,8 @@ type MockManager_ExpireAllocations_Call struct { } // ExpireAllocations is a helper method to define mock.On call -// - channel string -// - ts uint64 +// - channel string +// - ts uint64 func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} } @@ -222,9 +222,9 @@ type MockManager_GetFlushableSegments_Call struct { } // GetFlushableSegments is a helper method to define mock.On call -// - ctx context.Context -// - channel string -// - ts uint64 +// - ctx context.Context +// - channel string +// - ts uint64 func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call { return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)} } @@ -278,9 +278,9 @@ type MockManager_SealAllSegments_Call struct { } // SealAllSegments is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segIDs []int64 +// - ctx context.Context +// - collectionID int64 +// - segIDs []int64 func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call { return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)} } diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index a63f90ca94..6221db4f1d 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -6,6 +6,8 @@ import ( context "context" datapb "github.com/milvus-io/milvus/internal/proto/datapb" + healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck" + mock "github.com/stretchr/testify/mock" typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -113,44 +115,44 @@ func (_c *MockSessionManager_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } -// CheckHealth provides a mock function with given fields: ctx -func (_m *MockSessionManager) CheckHealth(ctx context.Context) error { +// CheckDNHealth provides a mock function with given fields: ctx +func (_m *MockSessionManager) CheckDNHealth(ctx context.Context) *healthcheck.Result { ret := _m.Called(ctx) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { + var r0 *healthcheck.Result + if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok { r0 = rf(ctx) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(*healthcheck.Result) } return r0 } -// MockSessionManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockSessionManager_CheckHealth_Call struct { +// MockSessionManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth' +type MockSessionManager_CheckDNHealth_Call struct { *mock.Call } -// CheckHealth is a helper method to define mock.On call +// CheckDNHealth is a helper method to define mock.On call // - ctx context.Context -func (_e *MockSessionManager_Expecter) CheckHealth(ctx interface{}) *MockSessionManager_CheckHealth_Call { - return &MockSessionManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)} +func (_e *MockSessionManager_Expecter) CheckDNHealth(ctx interface{}) *MockSessionManager_CheckDNHealth_Call { + return &MockSessionManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)} } -func (_c *MockSessionManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckDNHealth_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *MockSessionManager_CheckHealth_Call) Return(_a0 error) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSessionManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 5012e309fc..e513bfe5c4 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -343,6 +343,22 @@ func (c *mockDataNodeClient) Stop() error { return nil } +func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + if c.state == commonpb.StateCode_Healthy { + return &milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + IsHealthy: true, + Reasons: []string{}, + }, nil + } else { + return &milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe}, + IsHealthy: false, + Reasons: []string{"fails"}, + }, nil + } +} + type mockRootCoordClient struct { state commonpb.StateCode cnt int64 diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ecff0a56e3..359400f08c 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" @@ -162,6 +163,8 @@ type Server struct { // manage ways that data coord access other coord broker broker.Broker + + healthChecker *healthcheck.Checker } type CollectionNameInfo struct { @@ -407,6 +410,8 @@ func (s *Server) initDataCoord() error { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return nil } @@ -725,6 +730,7 @@ func (s *Server) startServerLoop() { go s.importChecker.Start() s.garbageCollector.start() s.syncSegmentsScheduler.Start() + s.healthChecker.Start() } // startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream @@ -1107,6 +1113,9 @@ func (s *Server) Stop() error { if !s.stateCode.CompareAndSwap(commonpb.StateCode_Healthy, commonpb.StateCode_Abnormal) { return nil } + if s.healthChecker != nil { + s.healthChecker.Close() + } logutil.Logger(s.ctx).Info("datacoord server shutdown") s.garbageCollector.close() logutil.Logger(s.ctx).Info("datacoord garbage collector stopped") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index c830d831c8..501aaa7c9c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -51,6 +51,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -2527,12 +2528,12 @@ func Test_CheckHealth(t *testing.T) { return sm } - getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager { + getChannelManager := func(findWatcherOk bool) ChannelManager { channelManager := NewMockChannelManager(t) if findWatcherOk { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil) + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe() } else { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")) + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe() } return channelManager } @@ -2545,6 +2546,21 @@ func Test_CheckHealth(t *testing.T) { 2: nil, } + newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server { + svr := &Server{ + ctx: context.TODO(), + sessionManager: getSessionManager(isHealthy), + channelManager: getChannelManager(findWatcherOk), + meta: meta, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, + } + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn) + svr.healthChecker.Start() + time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker + return svr + } + t.Run("not healthy", func(t *testing.T) { ctx := context.Background() s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} @@ -2556,9 +2572,8 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("data node health check is fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(false) + svr := newServer(false, true, &meta{channelCPs: newChannelCps()}) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2567,11 +2582,8 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check channel watched fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, false) - svr.meta = &meta{collections: collections} + svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()}) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2580,11 +2592,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check checkpoint fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, true) - svr.meta = &meta{ + svr := newServer(true, true, &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2594,8 +2602,8 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - } - + }) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2604,11 +2612,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("ok", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, true) - svr.meta = &meta{ + svr := newServer(true, true, &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2626,7 +2630,8 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - } + }) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 9b77d6a6a4..fd46a5cb2e 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -35,7 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" @@ -1550,20 +1550,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque }, nil } - err := s.sessionManager.CheckHealth(ctx) - if err != nil { - return componentutil.CheckHealthRespWithErr(err), nil + latestCheckResult := s.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} + +func (s *Server) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() + + checkResults := s.sessionManager.CheckDNHealth(ctx) + for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched)) } - if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil + for collectionID, failReason := range CheckCheckPointsHealth(s.meta) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed)) } - - if err = CheckCheckPointsHealth(s.meta); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil - } - - return componentutil.CheckHealthRespWithErr(nil), nil + return checkResults } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index c962bf9797..45a698b8a4 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "sync" "time" "github.com/cockroachdb/errors" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -69,7 +71,7 @@ type SessionManager interface { QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) DropImport(nodeID int64, in *datapb.DropImportRequest) error - CheckHealth(ctx context.Context) error + CheckDNHealth(ctx context.Context) *healthcheck.Result QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error Close() @@ -508,28 +510,44 @@ func (c *SessionManagerImpl) DropImport(nodeID int64, in *datapb.DropImportReque return VerifyResponse(status, err) } -func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error { - group, ctx := errgroup.WithContext(ctx) - +func (c *SessionManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} ids := c.GetSessionIDs() + for _, nodeID := range ids { nodeID := nodeID - group.Go(func() error { - cli, err := c.getClient(ctx, nodeID) + wg.Add(1) + go func() { + defer wg.Done() + + datanodeClient, err := c.getClient(ctx, nodeID) if err != nil { - return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err) + err = fmt.Errorf("failed to get node:%d: %v", nodeID, err) + return } - sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - return err + checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { + err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err) + wlock.Lock() + result.AppendUnhealthyClusterMsg( + healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) + wlock.Unlock() + return } - err = merr.AnalyzeState("DataNode", nodeID, sta) - return err - }) + + if len(checkHealthResp.Reasons) > 0 { + wlock.Lock() + result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) + wlock.Unlock() + } + }() } - return group.Wait() + wg.Wait() + return result } func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 5ceaa9b014..59e2c4ad8e 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -271,7 +271,8 @@ func getCompactionMergeInfo(task *datapb.CompactionTask) *milvuspb.CompactionMer } } -func CheckCheckPointsHealth(meta *meta) error { +func CheckCheckPointsHealth(meta *meta) map[int64]string { + checkResult := make(map[int64]string) for channel, cp := range meta.GetChannelCheckpoints() { collectionID := funcutil.GetCollectionIDFromVChannel(channel) if collectionID == -1 { @@ -285,31 +286,30 @@ func CheckCheckPointsHealth(meta *meta) error { ts, _ := tsoutil.ParseTS(cp.Timestamp) lag := time.Since(ts) if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) { - return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes())) + checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel) } } - return nil + return checkResult } -func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error { +func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string { collIDs := meta.ListCollections() + checkResult := make(map[int64]string) for _, collID := range collIDs { collInfo := meta.GetCollection(collID) if collInfo == nil { - log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) + log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID)) continue } for _, channelName := range collInfo.VChannelNames { _, err := channelManager.FindWatcher(channelName) if err != nil { - log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), - zap.String("channelName", channelName), zap.Error(err)) - return err + checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName) } } } - return nil + return checkResult } func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 24436df1de..4296340139 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,6 +22,7 @@ package datanode import ( "context" "fmt" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -45,7 +47,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // WatchDmChannels is not in use @@ -583,3 +588,20 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } + +func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minFGChannel, minFGTt := rateCol.getMinFlowGraphTt() + if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil { + msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) + return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil + } + return healthcheck.OK(), nil +} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 1a1820bf80..d69ab10713 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -1114,6 +1114,40 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { }) } +func (s *DataNodeServicesSuite) TestCheckHealth() { + s.Run("node not healthy", func() { + s.SetupTest() + s.node.UpdateStateCode(commonpb.StateCode_Abnormal) + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.False(merr.Ok(resp.GetStatus())) + s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) + }) + + s.Run("exceeded timetick lag on pipeline", func() { + s.SetupTest() + rateCol.updateFlowGraphTt("timetick-lag-ch", 1) + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.False(resp.GetIsHealthy()) + s.NotEmpty(resp.Reasons) + }) + + s.Run("ok", func() { + s.SetupTest() + rateCol.removeFlowGraphChannel("timetick-lag-ch") + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.True(resp.GetIsHealthy()) + s.Empty(resp.Reasons) + }) +} + func (s *DataNodeServicesSuite) TestDropCompactionPlan() { s.Run("node not healthy", func() { s.SetupTest() diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 67d5081a19..3944d4039a 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact return client.DropCompactionPlan(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 5e4ae6f009..fe564e5baf 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -407,3 +407,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (* func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { return s.datanode.DropCompactionPlan(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.datanode.CheckHealth(ctx, req) +} diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 640ee87e28..364347f8c2 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -185,6 +185,10 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC return m.status, m.err } +func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.err +} + // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func Test_NewServer(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index abd4b714d7..ac200a5afd 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -345,3 +345,9 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques return client.DeleteBatch(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 31a3074b12..794fe1ce4a 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -389,3 +389,7 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { return s.querynode.DeleteBatch(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.querynode.CheckHealth(ctx, req) +} diff --git a/internal/metastore/mocks/mock_rootcoord_catalog.go b/internal/metastore/mocks/mock_rootcoord_catalog.go index 646eb849ae..8c35d288c1 100644 --- a/internal/metastore/mocks/mock_rootcoord_catalog.go +++ b/internal/metastore/mocks/mock_rootcoord_catalog.go @@ -1879,7 +1879,7 @@ func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.Privileg return _c } -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo,error)) *RootCoordCatalog_GetPrivilegeGroup_Call { +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_GetPrivilegeGroup_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index 190da75be0..5e2836fae8 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -87,6 +87,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call { + return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // CompactionV2 provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index 91661051c3..633cf103d2 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -101,6 +101,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call { + return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockDataNodeClient) Close() error { ret := _m.Called() diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index abcf83fade..0332ef6aec 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter { return &MockQueryNode_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call { + return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index e7777eb6ab..e2b04295d9 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter { return &MockQueryNodeClient_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call { + return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockQueryNodeClient) Close() error { ret := _m.Called() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 3bd0a0abe9..7294914f03 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -131,6 +131,8 @@ service DataNode { rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } message FlushRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 2f52c2ce47..4c3f212599 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -175,7 +175,9 @@ service QueryNode { // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) { - } + } + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } // --------------------QueryCoord grpc request and response proto------------------ diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index bee771d58f..ba0a3398bf 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -42,7 +42,7 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *ChannelLevelScoreBalancer { return &ChannelLevelScoreBalancer{ ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 9f1de2b026..7466c0e782 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -453,7 +453,7 @@ func (g *randomPlanGenerator) generatePlans() []SegmentAssignPlan { type MultiTargetBalancer struct { *ScoreBasedBalancer dist *meta.DistributionManager - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface } func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { @@ -561,7 +561,7 @@ func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSeg return plans } -func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager) *MultiTargetBalancer { +func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr meta.TargetManagerInterface) *MultiTargetBalancer { return &MultiTargetBalancer{ ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), dist: dist, diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 53d09d89f3..a664c5885a 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -37,7 +37,7 @@ type RowCountBasedBalancer struct { *RoundRobinBalancer dist *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface } // AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count. @@ -366,7 +366,7 @@ func NewRowCountBasedBalancer( nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *RowCountBasedBalancer { return &RowCountBasedBalancer{ RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager), diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 0e3aad1f78..872d273c43 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -42,7 +42,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *ScoreBasedBalancer { return &ScoreBasedBalancer{ RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), diff --git a/internal/querycoordv2/dist/dist_controller.go b/internal/querycoordv2/dist/dist_controller.go index 687e16fe5c..5661eaae33 100644 --- a/internal/querycoordv2/dist/dist_controller.go +++ b/internal/querycoordv2/dist/dist_controller.go @@ -99,7 +99,7 @@ func NewDistController( client session.Cluster, nodeManager *session.NodeManager, dist *meta.DistributionManager, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, scheduler task.Scheduler, syncTargetVersionFn TriggerUpdateTargetVersion, ) *ControllerImpl { diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index c441714a99..3ae1fdddee 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -51,7 +51,7 @@ type LoadCollectionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver nodeMgr *session.NodeManager @@ -64,7 +64,7 @@ func NewLoadCollectionJob( meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, @@ -265,7 +265,7 @@ type LoadPartitionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver collectionObserver *observers.CollectionObserver nodeMgr *session.NodeManager @@ -278,7 +278,7 @@ func NewLoadPartitionJob( meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index ea6289ba8a..ca903159a5 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -42,7 +42,7 @@ type ReleaseCollectionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver checkerController *checkers.CheckerController proxyManager proxyutil.ProxyClientManagerInterface @@ -54,7 +54,7 @@ func NewReleaseCollectionJob(ctx context.Context, meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, proxyManager proxyutil.ProxyClientManagerInterface, @@ -82,8 +82,6 @@ func (job *ReleaseCollectionJob) Execute() error { return nil } - job.meta.CollectionManager.SetReleasing(req.GetCollectionID()) - loadedPartitions := job.meta.CollectionManager.GetPartitionsByCollection(req.GetCollectionID()) toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 { return partition.GetPartitionID() @@ -130,7 +128,7 @@ type ReleasePartitionJob struct { meta *meta.Meta broker meta.Broker cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver checkerController *checkers.CheckerController proxyManager proxyutil.ProxyClientManagerInterface @@ -142,7 +140,7 @@ func NewReleasePartitionJob(ctx context.Context, meta *meta.Meta, broker meta.Broker, cluster session.Cluster, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, checkerController *checkers.CheckerController, proxyManager proxyutil.ProxyClientManagerInterface, diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 21d2953863..e1314f0aec 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -38,12 +38,12 @@ type UndoList struct { ctx context.Context meta *meta.Meta cluster session.Cluster - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *observers.TargetObserver } func NewUndoList(ctx context.Context, meta *meta.Meta, - cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver, ) *UndoList { return &UndoList{ ctx: ctx, diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 5df4286417..f88c664169 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -50,7 +50,6 @@ type Collection struct { mut sync.RWMutex refreshNotifier chan struct{} LoadSpan trace.Span - isReleasing bool } func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) { @@ -60,18 +59,6 @@ func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) { collection.refreshNotifier = notifier } -func (collection *Collection) SetReleasing() { - collection.mut.Lock() - defer collection.mut.Unlock() - collection.isReleasing = true -} - -func (collection *Collection) IsReleasing() bool { - collection.mut.RLock() - defer collection.mut.RUnlock() - return collection.isReleasing -} - func (collection *Collection) IsRefreshed() bool { collection.mut.RLock() notifier := collection.refreshNotifier @@ -439,15 +426,6 @@ func (m *CollectionManager) Exist(collectionID typeutil.UniqueID) bool { return ok } -func (m *CollectionManager) SetReleasing(collectionID typeutil.UniqueID) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - coll, ok := m.collections[collectionID] - if ok { - coll.SetReleasing() - } -} - // GetAll returns the collection ID of all loaded collections func (m *CollectionManager) GetAll() []int64 { m.rwmutex.RLock() diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 961d0b64f4..ae935b1304 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter { return &MockQueryNodeServer_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeServer_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call { + return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index e641d1c245..b71c56175b 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -47,7 +47,7 @@ type CollectionObserver struct { dist *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface targetObserver *TargetObserver checkerController *checkers.CheckerController partitionLoadedCount map[int64]int @@ -69,7 +69,7 @@ type LoadTask struct { func NewCollectionObserver( dist *meta.DistributionManager, meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, targetObserver *TargetObserver, checherController *checkers.CheckerController, proxyManager proxyutil.ProxyClientManagerInterface, diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 3bb1c31924..45e6345488 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -75,7 +75,7 @@ type TargetObserver struct { cancel context.CancelFunc wg sync.WaitGroup meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface distMgr *meta.DistributionManager broker meta.Broker cluster session.Cluster @@ -101,7 +101,7 @@ type TargetObserver struct { func NewTargetObserver( meta *meta.Meta, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, distMgr *meta.DistributionManager, broker meta.Broker, cluster session.Cluster, diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 0b6aa3d36d..c3a6f29502 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -53,6 +53,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -94,7 +95,7 @@ type Server struct { store metastore.QueryCoordCatalog meta *meta.Meta dist *meta.DistributionManager - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface broker meta.Broker // Session @@ -134,6 +135,8 @@ type Server struct { proxyCreator proxyutil.ProxyCreator proxyWatcher proxyutil.ProxyWatcherInterface proxyClientManager proxyutil.ProxyClientManagerInterface + + healthChecker *healthcheck.Checker } func NewQueryCoord(ctx context.Context) (*Server, error) { @@ -346,6 +349,8 @@ func (s *Server) initQueryCoord() error { // Init load status cache meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return err } @@ -489,6 +494,7 @@ func (s *Server) startQueryCoord() error { s.startServerLoop() s.afterStart() + s.healthChecker.Start() s.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID()) return nil @@ -525,7 +531,9 @@ func (s *Server) Stop() error { // FOLLOW the dependence graph: // job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session // observers -> dist controller - + if s.healthChecker != nil { + s.healthChecker.Close() + } if s.jobScheduler != nil { log.Info("stop job scheduler...") s.jobScheduler.Stop() diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 5f5ecc8c4b..e97aaa9c72 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -35,7 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -935,16 +936,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - errReasons, err := s.checkNodeHealth(ctx) - if err != nil || len(errReasons) != 0 { - return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil - } + latestCheckResult := s.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} - if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { - log.Warn("some collection is not queryable during health check", zap.Error(err)) - } +func (s *Server) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() - return componentutil.CheckHealthRespWithErr(nil), nil + checkResults := s.broadcastCheckHealth(ctx) + for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable)) + } + return checkResults } func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { @@ -975,6 +980,39 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { return errReasons, err } +func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} + + for _, node := range s.nodeMgr.GetAll() { + node := node + wg.Add(1) + go func() { + defer wg.Done() + + checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID()) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { + err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err) + wlock.Lock() + result.AppendUnhealthyClusterMsg( + healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck)) + wlock.Unlock() + return + } + + if len(checkHealthResp.Reasons) > 0 { + wlock.Lock() + result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) + wlock.Unlock() + } + }() + } + + wg.Wait() + return result +} + func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.String("rgName", req.GetResourceGroup()), diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 7066e22de2..5d1d73e2ff 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -48,6 +48,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" @@ -171,6 +172,11 @@ func (suite *ServiceSuite) SetupTest() { } suite.cluster = session.NewMockCluster(suite.T()) suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + IsHealthy: true, + Reasons: []string{}, + }, nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() @@ -1630,42 +1636,82 @@ func (suite *ServiceSuite) TestCheckHealth() { suite.loadAll() ctx := context.Background() server := suite.server + server.healthChecker = healthcheck.NewChecker(40*time.Millisecond, suite.server.healthCheckFn) + server.healthChecker.Start() + defer server.healthChecker.Close() + + assertCheckHealthResult := func(isHealthy bool) { + resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + suite.NoError(err) + suite.Equal(resp.IsHealthy, isHealthy) + if !isHealthy { + suite.NotEmpty(resp.Reasons) + } else { + suite.Empty(resp.Reasons) + } + } + + setNodeSate := func(isHealthy bool, isRPCFail bool) { + var resp *milvuspb.CheckHealthResponse + if isHealthy { + resp = healthcheck.OK() + } else { + resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck)) + } + resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} + if isRPCFail { + resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny} + } + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset() + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe() + time.Sleep(50 * time.Millisecond) + } // Test for server is not healthy server.UpdateStateCode(commonpb.StateCode_Initializing) + assertCheckHealthResult(false) + + // Test for check health has some error reasons + setNodeSate(false, false) + server.UpdateStateCode(commonpb.StateCode_Healthy) + assertCheckHealthResult(false) + + // Test for check health rpc fail + setNodeSate(true, true) + server.UpdateStateCode(commonpb.StateCode_Healthy) + assertCheckHealthResult(false) + + // Test for check load percentage fail + setNodeSate(true, false) + assertCheckHealthResult(true) + + // Test for check channel ok + for _, collection := range suite.collections { + suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) + suite.updateChannelDist(collection) + } + assertCheckHealthResult(true) + + // Test for check channel fail + tm := meta.NewMockTargetManager(suite.T()) + tm.EXPECT().GetDmChannelsByCollection(mock.Anything, mock.Anything).Return(nil).Maybe() + otm := server.targetMgr + server.targetMgr = tm + assertCheckHealthResult(true) + + // Test for get shard leader fail + server.targetMgr = otm + for _, node := range suite.nodes { + suite.nodeMgr.Stopping(node) + } + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key) + time.Sleep(1500 * time.Millisecond) resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) suite.NoError(err) - suite.Equal(resp.IsHealthy, false) - suite.NotEmpty(resp.Reasons) - - // Test for components state fail - for _, node := range suite.nodes { - suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return( - &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, - nil).Once() - } - server.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) - suite.Equal(resp.IsHealthy, false) - suite.NotEmpty(resp.Reasons) - - // Test for server is healthy - for _, node := range suite.nodes { - suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return( - &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, - nil).Once() - } - resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) suite.Equal(resp.IsHealthy, true) - suite.Empty(resp.Reasons) + suite.NotEmpty(resp.Reasons) } func (suite *ServiceSuite) TestGetShardLeaders() { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 7b6bc316eb..569dbb0029 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -52,6 +52,7 @@ type Cluster interface { GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) + CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) Start() Stop() } @@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types return nil } +func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + var ( + resp *milvuspb.CheckHealthResponse + err error + ) + err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) { + resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + }) + if err1 != nil { + return nil, err1 + } + return resp, err +} + type clients struct { sync.RWMutex clients map[int64]types.QueryNodeClient // nodeID -> client diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index dbc14c720c..136f6c4e23 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter { return &MockCluster_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, nodeID +func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(ctx, nodeID) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, nodeID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, nodeID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockCluster_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call { + return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)} +} + +func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: ctx, nodeID func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) { ret := _m.Called(ctx, nodeID) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 474dc9a701..db3acf4477 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -58,7 +58,7 @@ type Executor struct { meta *meta.Meta dist *meta.DistributionManager broker meta.Broker - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface cluster session.Cluster nodeMgr *session.NodeManager @@ -70,7 +70,7 @@ type Executor struct { func NewExecutor(meta *meta.Meta, dist *meta.DistributionManager, broker meta.Broker, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, cluster session.Cluster, nodeMgr *session.NodeManager, ) *Executor { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 4cc328959a..416d3ea1d4 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -157,7 +157,7 @@ type taskScheduler struct { distMgr *meta.DistributionManager meta *meta.Meta - targetMgr *meta.TargetManager + targetMgr meta.TargetManagerInterface broker meta.Broker cluster session.Cluster nodeMgr *session.NodeManager @@ -172,7 +172,7 @@ type taskScheduler struct { func NewScheduler(ctx context.Context, meta *meta.Meta, distMgr *meta.DistributionManager, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, broker meta.Broker, cluster session.Cluster, nodeMgr *session.NodeManager, diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index e37961c8bc..6bcb80677b 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target for segmentID, info := range segmentDist { _, exist := leader.Segments[segmentID] if !exist { - log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID if l0WithWrongLocation { - log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) + log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } } @@ -108,13 +108,11 @@ func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) erro return nil } -func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, +func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel, ) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) for _, channel := range channels { - log := log.With(zap.String("channel", channel.GetChannelName())) - var channelErr error leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) if len(leaders) == 0 { @@ -132,7 +130,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di if len(readableLeaders) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) - log.Warn(msg, zap.Error(channelErr)) + log.RatedWarn(60, msg, zap.Error(channelErr)) err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error()) return nil, err } @@ -169,7 +167,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di return ret, nil } -func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { +func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { if err := checkLoadStatus(ctx, m, collectionID); err != nil { return nil, err } @@ -185,19 +183,24 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetMa } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { - maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) +func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string { + maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) + checkResult := make(map[int64]string) for _, coll := range m.GetAllCollections() { err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll) - if err != nil && !coll.IsReleasing() && time.Since(coll.UpdatedAt) >= maxInterval { - return err + // the collection is not queryable, if meet following conditions: + // 1. Some segments are not loaded + // 2. Collection is not starting to release + // 3. The load percentage has not been updated in the last 5 minutes. + if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { + checkResult[coll.CollectionID] = err.Error() } } - return nil + return checkResult } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection -func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { +func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { collectionID := coll.GetCollectionID() if err := checkLoadStatus(ctx, m, collectionID); err != nil { return err diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1fa77d3d4e..2fd75fd950 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -53,6 +54,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1438,6 +1440,25 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } +func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := node.lifetime.Add(merr.IsHealthy); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + defer node.lifetime.Done() + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minTsafeChannel, minTsafe := node.tSafeManager.Min() + if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil { + msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) + return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil + } + + return healthcheck.OK(), nil +} + // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 7c94af1ce8..ec5bcf54a2 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/streamrpc" @@ -97,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() { paramtable.Init() paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false") - suite.rootPath = suite.T().Name() + suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name()) suite.collectionID = 111 suite.collectionName = "test-collection" suite.partitionIDs = []int64{222} @@ -2197,6 +2198,41 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) } +func (suite *ServiceSuite) TestCheckHealth() { + suite.Run("node not healthy", func() { + suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) + + ctx := context.Background() + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.False(merr.Ok(resp.GetStatus())) + suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) + }) + + suite.Run("exceeded timetick lag on pipeline", func() { + suite.node.tSafeManager = tsafe.NewTSafeReplica() + suite.node.tSafeManager.Add(context.TODO(), "timetick-lag-ch", 1) + ctx := context.Background() + suite.node.UpdateStateCode(commonpb.StateCode_Healthy) + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.True(merr.Ok(resp.GetStatus())) + suite.False(resp.GetIsHealthy()) + suite.NotEmpty(resp.Reasons) + }) + + suite.Run("ok", func() { + ctx := context.Background() + suite.node.UpdateStateCode(commonpb.StateCode_Healthy) + suite.node.tSafeManager = tsafe.NewTSafeReplica() + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.True(merr.Ok(resp.GetStatus())) + suite.True(resp.GetIsHealthy()) + suite.Empty(resp.Reasons) + }) +} + func TestQueryNodeService(t *testing.T) { suite.Run(t, new(ServiceSuite)) } diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index 5cc0b97b72..c03a887bf4 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -404,6 +404,7 @@ func newMockProxy() *mockProxy { func newTestCore(opts ...Opt) *Core { c := &Core{ + ctx: context.TODO(), session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}, } executor := newMockStepExecutor() diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 26c5a8596f..612b3838fa 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -31,7 +31,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -49,6 +48,7 @@ import ( tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -127,6 +127,7 @@ type Core struct { enableActiveStandBy bool activateFunc func() error + healthChecker *healthcheck.Checker } // --------------------- function -------------------------- @@ -481,6 +482,8 @@ func (c *Core) initInternal() error { return err } + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn) log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address)) return nil } @@ -757,6 +760,7 @@ func (c *Core) startInternal() error { }() c.startServerLoop() + c.healthChecker.Start() c.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) logutil.Logger(c.ctx).Info("rootcoord startup successfully") @@ -815,6 +819,10 @@ func (c *Core) revokeSession() { // Stop stops rootCoord. func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) + if c.healthChecker != nil { + c.healthChecker.Close() + } + c.stopExecutor() c.stopScheduler() if c.proxyWatcher != nil { @@ -3008,53 +3016,40 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) }, nil } - group, ctx := errgroup.WithContext(ctx) - errs := typeutil.NewConcurrentSet[error]() + latestCheckResult := c.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} + +func (c *Core) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(c.ctx, timeout) + defer cancel() proxyClients := c.proxyClientManager.GetProxyClients() + wg := sync.WaitGroup{} + lock := sync.Mutex{} + result := healthcheck.NewResult() + proxyClients.Range(func(key int64, value types.ProxyClient) bool { nodeID := key proxyClient := value - group.Go(func() error { - sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - errs.Insert(err) - return err - } + wg.Add(1) + go func() { + defer wg.Done() + resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err) - err = merr.AnalyzeState("Proxy", nodeID, sta) + lock.Lock() + defer lock.Unlock() if err != nil { - errs.Insert(err) + result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) } - - return err - }) + }() return true }) - maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - if maxDelay > 0 { - group.Go(func() error { - err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) - if err != nil { - errs.Insert(err) - } - return err - }) - } - - err := group.Wait() - if err != nil { - return &milvuspb.CheckHealthResponse{ - Status: merr.Success(), - IsHealthy: false, - Reasons: lo.Map(errs.Collect(), func(e error, i int) string { - return err.Error() - }), - }, nil - } - - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil + wg.Wait() + return result } func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 1ea416a4ce..59cd1d9540 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -40,6 +39,7 @@ import ( mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/dependency" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util" @@ -49,7 +49,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1453,65 +1452,6 @@ func TestRootCoord_AlterCollection(t *testing.T) { } func TestRootCoord_CheckHealth(t *testing.T) { - getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { - clusterTopology := metricsinfo.QueryClusterTopology{ - ConnectedNodes: []metricsinfo.QueryNodeInfos{ - { - QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ - Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphChannel: "ch1", - MinFlowGraphTt: tt, - NumFlowGraph: 1, - }, - }, - }, - }, - } - - resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology}) - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0), - }, nil - } - - getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { - clusterTopology := metricsinfo.DataClusterTopology{ - ConnectedDataNodes: []metricsinfo.DataNodeInfos{ - { - QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ - Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphChannel: "ch1", - MinFlowGraphTt: tt, - NumFlowGraph: 1, - }, - }, - }, - }, - } - - resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology}) - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0), - }, nil - } - - querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0) - datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0) - - dcClient := mocks.NewMockDataCoordClient(t) - dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT)) - qcClient := mocks.NewMockQueryCoordClient(t) - qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT)) - - errDataCoordClient := mocks.NewMockDataCoordClient(t) - errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - errQueryCoordClient := mocks.NewMockQueryCoordClient(t) - errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - t.Run("not healthy", func(t *testing.T) { ctx := context.Background() c := newTestCore(withAbnormalCode()) @@ -1521,25 +1461,13 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("ok with disabled tt lag configuration", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - c := newTestCore(withHealthyCode(), withValidProxyManager()) - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, true, resp.IsHealthy) - assert.Empty(t, resp.Reasons) - }) - t.Run("proxy health check fail with invalid proxy", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + c := newTestCore(withHealthyCode(), withInvalidProxyManager()) + c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) + c.healthChecker.Start() + defer c.healthChecker.Close() - c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) + time.Sleep(50 * time.Millisecond) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1548,55 +1476,14 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("proxy health check fail with get metrics error", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + t.Run("ok", func(t *testing.T) { + c := newTestCore(withHealthyCode(), withValidProxyManager()) + c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) + c.healthChecker.Start() + defer c.healthChecker.Close() - { - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient)) + time.Sleep(50 * time.Millisecond) - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - } - - { - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient)) - - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - } - }) - - t.Run("ok with tt lag exceeded", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - }) - - t.Run("ok with tt lag checking", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 88a27e758d..c2588bb3b5 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -21,10 +21,8 @@ import ( "encoding/json" "fmt" "strconv" - "time" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/types" @@ -34,7 +32,6 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -284,83 +281,3 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn return ret, nil } - -func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout) - defer cancel() - - now := time.Now() - group := &errgroup.Group{} - queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() - dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() - - group.Go(func() error { - queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord) - if err != nil { - return err - } - - for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { - qm := queryNodeMetric.QuotaMetrics - if qm != nil { - if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" { - minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt) - delay := now.Sub(minTt) - - if delay.Milliseconds() >= maxDelay.Milliseconds() { - queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay) - } - } - } - } - return nil - }) - - // get Data cluster metrics - group.Go(func() error { - dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord) - if err != nil { - return err - } - - for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes { - dm := dataNodeMetric.QuotaMetrics - if dm != nil { - if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" { - minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt) - delay := now.Sub(minTt) - - if delay.Milliseconds() >= maxDelay.Milliseconds() { - dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay) - } - } - } - } - return nil - }) - - err := group.Wait() - if err != nil { - return err - } - - var maxLagChannel string - var maxLag time.Duration - findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) { - for _, param := range params { - param.Range(func(k string, v time.Duration) bool { - if v > maxLag { - maxLag = v - maxLagChannel = k - } - return true - }) - } - } - findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay) - - if maxLag > 0 && len(maxLagChannel) != 0 { - return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel) - } - return nil -} diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index d89c9db72b..93537d2445 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -84,17 +84,3 @@ func WaitForComponentHealthy[T interface { }](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error { return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep) } - -func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse { - if err != nil { - return CheckHealthRespWithErrMsg(err.Error()) - } - return CheckHealthRespWithErrMsg() -} - -func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse { - if len(errMsg) != 0 { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg} - } - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} -} diff --git a/internal/util/healthcheck/checker.go b/internal/util/healthcheck/checker.go new file mode 100644 index 0000000000..c1f06a8e10 --- /dev/null +++ b/internal/util/healthcheck/checker.go @@ -0,0 +1,276 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck + +import ( + "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// UnHealthyLevel represents the health level of a system. +type UnHealthyLevel int + +const ( + // Healthy means the system is operating normally. + Healthy UnHealthyLevel = iota + // Warning indicates minor issues that might escalate. + Warning + // Critical indicates major issues that need immediate attention. + Critical + // Fatal indicates system failure. + Fatal +) + +// String returns the string representation of the UnHealthyLevel. +func (u UnHealthyLevel) String() string { + switch u { + case Healthy: + return "Healthy" + case Warning: + return "Warning" + case Critical: + return "Critical" + case Fatal: + return "Fatal" + default: + return "Unknown" + } +} + +type Item int + +const ( + ChannelsWatched Item = iota + CheckpointLagExceed + CollectionQueryable + TimeTickLagExceed + NodeHealthCheck +) + +func getUnhealthyLevel(item Item) UnHealthyLevel { + switch item { + case ChannelsWatched: + return Fatal + case CheckpointLagExceed: + return Fatal + case TimeTickLagExceed: + return Fatal + case NodeHealthCheck: + return Fatal + case CollectionQueryable: + return Critical + default: + panic(fmt.Sprintf("unknown health check item: %d", int(item))) + } +} + +type Result struct { + UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"` + UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"` +} + +func NewResult() *Result { + return &Result{} +} + +func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) { + r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm) +} + +func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) { + r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm) +} + +func (r *Result) AppendResult(other *Result) { + if other == nil { + return + } + r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...) + r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...) +} + +func (r *Result) IsEmpty() bool { + return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 +} + +func (r *Result) IsHealthy() bool { + if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 { + return true + } + + for _, unm := range r.UnhealthyClusterMsgs { + if unm.Reason.UnhealthyLevel == Fatal { + return false + } + } + + for _, ucm := range r.UnhealthyCollectionMsgs { + if ucm.Reason.UnhealthyLevel == Fatal { + return false + } + } + + return true +} + +type UnhealthyReason struct { + UnhealthyMsg string `json:"unhealthy_msg"` + UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"` +} + +type UnhealthyClusterMsg struct { + Role string `json:"role"` + NodeID int64 `json:"node_id"` + Reason *UnhealthyReason `json:"reason"` +} + +func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg { + return &UnhealthyClusterMsg{ + Role: role, + NodeID: nodeID, + Reason: &UnhealthyReason{ + UnhealthyMsg: unhealthyMsg, + UnhealthyLevel: getUnhealthyLevel(item), + }, + } +} + +type UnhealthyCollectionMsg struct { + DatabaseID int64 `json:"database_id"` + CollectionID int64 `json:"collection_id"` + Reason *UnhealthyReason `json:"reason"` +} + +func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg { + return &UnhealthyCollectionMsg{ + CollectionID: collectionID, + Reason: &UnhealthyReason{ + UnhealthyMsg: unhealthyMsg, + UnhealthyLevel: getUnhealthyLevel(item), + }, + } +} + +type Checker struct { + sync.RWMutex + interval time.Duration + done chan struct{} + checkFn func() *Result + latestResult *Result + once sync.Once +} + +func NewChecker(interval time.Duration, checkFn func() *Result) *Checker { + checker := &Checker{ + interval: interval, + checkFn: checkFn, + latestResult: NewResult(), + done: make(chan struct{}, 1), + once: sync.Once{}, + } + return checker +} + +func (hc *Checker) Start() { + go func() { + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + log.Info("start health checker") + for { + select { + case <-ticker.C: + hc.Lock() + hc.latestResult = hc.checkFn() + hc.Unlock() + case <-hc.done: + log.Info("stop health checker") + return + } + } + }() +} + +func (hc *Checker) GetLatestCheckResult() *Result { + hc.RLock() + defer hc.RUnlock() + return hc.latestResult +} + +func (hc *Checker) Close() { + hc.once.Do(func() { + close(hc.done) + }) +} + +func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result { + var r Result + if len(resp.Reasons) == 0 { + return &r + } + if len(resp.Reasons) > 1 { + log.Error("invalid check result", zap.Any("reasons", resp.Reasons)) + return &r + } + + err := json.Unmarshal([]byte(resp.Reasons[0]), &r) + if err != nil { + log.Error("unmarshal check result error", zap.String("error", err.Error())) + } + return &r +} + +func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse { + r := &Result{UnhealthyClusterMsgs: msg} + reasons, err := json.Marshal(r) + if err != nil { + log.Error("marshal check result error", zap.String("error", err.Error())) + } + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: r.IsHealthy(), + Reasons: []string{string(reasons)}, + } +} + +func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse { + if checkResult.IsEmpty() { + return OK() + } + + reason, err := json.Marshal(checkResult) + if err != nil { + log.Error("marshal check result error", zap.String("error", err.Error())) + } + + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: checkResult.IsHealthy(), + Reasons: []string{string(reason)}, + } +} + +func OK() *milvuspb.CheckHealthResponse { + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} +} diff --git a/internal/util/healthcheck/checker_test.go b/internal/util/healthcheck/checker_test.go new file mode 100644 index 0000000000..7fdcb8cd6e --- /dev/null +++ b/internal/util/healthcheck/checker_test.go @@ -0,0 +1,60 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func TestChecker(t *testing.T) { + expected1 := NewResult() + expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) + expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) + + expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{ + CollectionID: 1, + Reason: &UnhealthyReason{ + UnhealthyMsg: "msg2", + UnhealthyLevel: Critical, + }, + }) + + checkFn := func() *Result { + return expected1 + } + checker := NewChecker(100*time.Millisecond, checkFn) + go checker.Start() + + time.Sleep(150 * time.Millisecond) + actual1 := checker.GetLatestCheckResult() + assert.Equal(t, expected1, actual1) + assert.False(t, actual1.IsHealthy()) + + chr := GetCheckHealthResponseFromResult(actual1) + assert.Equal(t, merr.Success(), chr.Status) + assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy) + assert.Equal(t, 1, len(chr.Reasons)) + + actualResult := GetHealthCheckResultFromResp(chr) + assert.Equal(t, actual1, actualResult) + checker.Close() +} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 13ae355738..621a2317e6 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } + +func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index dadfb31578..5db4bee2a4 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -134,6 +134,10 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet return &querypb.DeleteBatchResponse{}, m.Err } +func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} + func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index def2e64f01..c186fdf159 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -152,6 +152,10 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa return qn.QueryNode.DeleteBatch(ctx, in) } +func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return qn.QueryNode.CheckHealth(ctx, req) +} + func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 1a9862c9c3..82f79560cb 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,7 +24,6 @@ import ( "fmt" "net" "reflect" - "regexp" "strconv" "strings" "time" @@ -254,13 +253,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri } func GetCollectionIDFromVChannel(vChannelName string) int64 { - re := regexp.MustCompile(`.*_(\d+)v\d+`) - matches := re.FindStringSubmatch(vChannelName) - if len(matches) > 1 { - number, err := strconv.ParseInt(matches[1], 0, 64) - if err == nil { - return number - } + end := strings.LastIndexByte(vChannelName, 'v') + if end <= 0 { + return -1 + } + start := strings.LastIndexByte(vChannelName, '_') + if start <= 0 { + return -1 + } + + collectionIDStr := vChannelName[start+1 : end] + if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil { + return collectionID } return -1 } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index e3d9d3db92..4bf2f4a729 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -299,6 +299,13 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error { return CheckHealthy(stateCode) } +func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error { + if err != nil { + return errors.Wrap(err, "service is unhealthy") + } + return AnalyzeState(role, nodeID, resp) +} + func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error { if err := Error(state.GetStatus()); err != nil { return errors.Wrapf(err, "%s=%d not healthy", role, nodeID) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index af858cc0f4..86467ac474 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -266,6 +266,9 @@ type commonConfig struct { ReadOnlyPrivileges ParamItem `refreshable:"false"` ReadWritePrivileges ParamItem `refreshable:"false"` AdminPrivileges ParamItem `refreshable:"false"` + + HealthCheckInterval ParamItem `refreshable:"true"` + HealthCheckRPCTimeout ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -915,6 +918,22 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`, } p.AdminPrivileges.Init(base.mgr) + + p.HealthCheckInterval = ParamItem{ + Key: "common.healthcheck.interval.seconds", + Version: "2.4.8", + DefaultValue: "30", + Doc: `health check interval in seconds, default 30s`, + } + p.HealthCheckInterval.Init(base.mgr) + + p.HealthCheckRPCTimeout = ParamItem{ + Key: "common.healthcheck.timeout.seconds", + Version: "2.4.8", + DefaultValue: "10", + Doc: `RPC timeout for health check request`, + } + p.HealthCheckRPCTimeout.Init(base.mgr) } type gpuConfig struct { @@ -2169,9 +2188,9 @@ If this parameter is set false, Milvus simply searches the growing segments with p.UpdateCollectionLoadStatusInterval = ParamItem{ Key: "queryCoord.updateCollectionLoadStatusInterval", Version: "2.4.7", - DefaultValue: "5", + DefaultValue: "300", PanicIfEmpty: true, - Doc: "5m, max interval for updating collection loaded status", + Doc: "300s, max interval of updating collection loaded status for check health", Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index a05f2cabaa..3e5b77504e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -126,6 +126,11 @@ func TestComponentParam(t *testing.T) { params.Save("common.gchelper.minimumGoGC", "80") assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt()) + params.Save("common.healthcheck.interval.seconds", "60") + assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second)) + params.Save("common.healthcheck.timeout.seconds", "5") + assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt()) + assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) @@ -304,8 +309,8 @@ func TestComponentParam(t *testing.T) { checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt() assert.Equal(t, 2000, checkHealthRPCTimeout) - updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) - assert.Equal(t, updateInterval, time.Minute*5) + updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) + assert.Equal(t, updateInterval, time.Second*300) assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat()) params.Save("queryCoord.globalRowCountFactor", "0.4") diff --git a/pkg/util/ratelimitutil/utils.go b/pkg/util/ratelimitutil/utils.go index ae65eb14c7..1c70049ca8 100644 --- a/pkg/util/ratelimitutil/utils.go +++ b/pkg/util/ratelimitutil/utils.go @@ -16,7 +16,13 @@ package ratelimitutil -import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +import ( + "fmt" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) var QuotaErrorString = map[commonpb.ErrorCode]string{ commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator", @@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{ func GetQuotaErrorString(errCode commonpb.ErrorCode) string { return QuotaErrorString[errCode] } + +func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error { + if channel != "" && maxDelay > 0 { + minTt, _ := tsoutil.ParseTS(minTT) + delay := time.Since(minTt) + if delay.Milliseconds() >= maxDelay.Milliseconds() { + return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel) + } + } + return nil +}