mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: optimize CPU usage for CheckHealth requests (#35595)
issue: #35563 pr: #35589 Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
0fe4b2c7b7
commit
319f5494cd
@ -371,7 +371,7 @@ queryCoord:
|
||||
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
|
||||
collectionObserverInterval: 200 # the interval of collection observer
|
||||
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
|
||||
updateCollectionLoadStatusInterval: 5 # 5m, max interval for updating collection loaded status
|
||||
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
|
||||
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
|
||||
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
|
||||
port: 19531 # TCP port of queryCoord
|
||||
|
||||
@ -53,11 +53,11 @@ type MockManager_AllocSegment_Call struct {
|
||||
}
|
||||
|
||||
// AllocSegment is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - partitionID int64
|
||||
// - channelName string
|
||||
// - requestRows int64
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - partitionID int64
|
||||
// - channelName string
|
||||
// - requestRows int64
|
||||
func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call {
|
||||
return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)}
|
||||
}
|
||||
@ -90,8 +90,8 @@ type MockManager_DropSegment_Call struct {
|
||||
}
|
||||
|
||||
// DropSegment is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - segmentID int64
|
||||
// - ctx context.Context
|
||||
// - segmentID int64
|
||||
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
|
||||
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)}
|
||||
}
|
||||
@ -124,8 +124,8 @@ type MockManager_DropSegmentsOfChannel_Call struct {
|
||||
}
|
||||
|
||||
// DropSegmentsOfChannel is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - channel string
|
||||
// - ctx context.Context
|
||||
// - channel string
|
||||
func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call {
|
||||
return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)}
|
||||
}
|
||||
@ -167,8 +167,8 @@ type MockManager_ExpireAllocations_Call struct {
|
||||
}
|
||||
|
||||
// ExpireAllocations is a helper method to define mock.On call
|
||||
// - channel string
|
||||
// - ts uint64
|
||||
// - channel string
|
||||
// - ts uint64
|
||||
func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call {
|
||||
return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)}
|
||||
}
|
||||
@ -222,9 +222,9 @@ type MockManager_GetFlushableSegments_Call struct {
|
||||
}
|
||||
|
||||
// GetFlushableSegments is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - channel string
|
||||
// - ts uint64
|
||||
// - ctx context.Context
|
||||
// - channel string
|
||||
// - ts uint64
|
||||
func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call {
|
||||
return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)}
|
||||
}
|
||||
@ -278,9 +278,9 @@ type MockManager_SealAllSegments_Call struct {
|
||||
}
|
||||
|
||||
// SealAllSegments is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - segIDs []int64
|
||||
// - ctx context.Context
|
||||
// - collectionID int64
|
||||
// - segIDs []int64
|
||||
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
|
||||
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)}
|
||||
}
|
||||
|
||||
@ -6,6 +6,8 @@ import (
|
||||
context "context"
|
||||
|
||||
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -113,44 +115,44 @@ func (_c *MockSessionManager_CheckChannelOperationProgress_Call) RunAndReturn(ru
|
||||
return _c
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: ctx
|
||||
func (_m *MockSessionManager) CheckHealth(ctx context.Context) error {
|
||||
// CheckDNHealth provides a mock function with given fields: ctx
|
||||
func (_m *MockSessionManager) CheckDNHealth(ctx context.Context) *healthcheck.Result {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
var r0 *healthcheck.Result
|
||||
if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
r0 = ret.Get(0).(*healthcheck.Result)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockSessionManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockSessionManager_CheckHealth_Call struct {
|
||||
// MockSessionManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth'
|
||||
type MockSessionManager_CheckDNHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// CheckDNHealth is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
func (_e *MockSessionManager_Expecter) CheckHealth(ctx interface{}) *MockSessionManager_CheckHealth_Call {
|
||||
return &MockSessionManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)}
|
||||
func (_e *MockSessionManager_Expecter) CheckDNHealth(ctx interface{}) *MockSessionManager_CheckDNHealth_Call {
|
||||
return &MockSessionManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)}
|
||||
}
|
||||
|
||||
func (_c *MockSessionManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckHealth_Call {
|
||||
func (_c *MockSessionManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckDNHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSessionManager_CheckHealth_Call) Return(_a0 error) *MockSessionManager_CheckHealth_Call {
|
||||
func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 healthcheck.Result) *MockSessionManager_CheckDNHealth_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSessionManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockSessionManager_CheckHealth_Call {
|
||||
func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) healthcheck.Result) *MockSessionManager_CheckDNHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -343,6 +343,22 @@ func (c *mockDataNodeClient) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
if c.state == commonpb.StateCode_Healthy {
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
IsHealthy: true,
|
||||
Reasons: []string{},
|
||||
}, nil
|
||||
} else {
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
|
||||
IsHealthy: false,
|
||||
Reasons: []string{"fails"},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type mockRootCoordClient struct {
|
||||
state commonpb.StateCode
|
||||
cnt int64
|
||||
|
||||
@ -46,6 +46,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
@ -162,6 +163,8 @@ type Server struct {
|
||||
|
||||
// manage ways that data coord access other coord
|
||||
broker broker.Broker
|
||||
|
||||
healthChecker *healthcheck.Checker
|
||||
}
|
||||
|
||||
type CollectionNameInfo struct {
|
||||
@ -407,6 +410,8 @@ func (s *Server) initDataCoord() error {
|
||||
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
|
||||
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
|
||||
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
|
||||
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
|
||||
return nil
|
||||
}
|
||||
@ -725,6 +730,7 @@ func (s *Server) startServerLoop() {
|
||||
go s.importChecker.Start()
|
||||
s.garbageCollector.start()
|
||||
s.syncSegmentsScheduler.Start()
|
||||
s.healthChecker.Start()
|
||||
}
|
||||
|
||||
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
|
||||
@ -1107,6 +1113,9 @@ func (s *Server) Stop() error {
|
||||
if !s.stateCode.CompareAndSwap(commonpb.StateCode_Healthy, commonpb.StateCode_Abnormal) {
|
||||
return nil
|
||||
}
|
||||
if s.healthChecker != nil {
|
||||
s.healthChecker.Close()
|
||||
}
|
||||
logutil.Logger(s.ctx).Info("datacoord server shutdown")
|
||||
s.garbageCollector.close()
|
||||
logutil.Logger(s.ctx).Info("datacoord garbage collector stopped")
|
||||
|
||||
@ -51,6 +51,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
@ -2527,12 +2528,12 @@ func Test_CheckHealth(t *testing.T) {
|
||||
return sm
|
||||
}
|
||||
|
||||
getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
|
||||
getChannelManager := func(findWatcherOk bool) ChannelManager {
|
||||
channelManager := NewMockChannelManager(t)
|
||||
if findWatcherOk {
|
||||
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
|
||||
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
|
||||
} else {
|
||||
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
|
||||
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
|
||||
}
|
||||
return channelManager
|
||||
}
|
||||
@ -2545,6 +2546,21 @@ func Test_CheckHealth(t *testing.T) {
|
||||
2: nil,
|
||||
}
|
||||
|
||||
newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
|
||||
svr := &Server{
|
||||
ctx: context.TODO(),
|
||||
sessionManager: getSessionManager(isHealthy),
|
||||
channelManager: getChannelManager(findWatcherOk),
|
||||
meta: meta,
|
||||
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
|
||||
}
|
||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
|
||||
svr.healthChecker.Start()
|
||||
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
|
||||
return svr
|
||||
}
|
||||
|
||||
t.Run("not healthy", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
@ -2556,9 +2572,8 @@ func Test_CheckHealth(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("data node health check is fail", func(t *testing.T) {
|
||||
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
svr.sessionManager = getSessionManager(false)
|
||||
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
|
||||
defer svr.healthChecker.Close()
|
||||
ctx := context.Background()
|
||||
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
@ -2567,11 +2582,8 @@ func Test_CheckHealth(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("check channel watched fail", func(t *testing.T) {
|
||||
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
svr.sessionManager = getSessionManager(true)
|
||||
svr.channelManager = getChannelManager(t, false)
|
||||
svr.meta = &meta{collections: collections}
|
||||
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
|
||||
defer svr.healthChecker.Close()
|
||||
ctx := context.Background()
|
||||
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
@ -2580,11 +2592,7 @@ func Test_CheckHealth(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("check checkpoint fail", func(t *testing.T) {
|
||||
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
svr.sessionManager = getSessionManager(true)
|
||||
svr.channelManager = getChannelManager(t, true)
|
||||
svr.meta = &meta{
|
||||
svr := newServer(true, true, &meta{
|
||||
collections: collections,
|
||||
channelCPs: &channelCPs{
|
||||
checkpoints: map[string]*msgpb.MsgPosition{
|
||||
@ -2594,8 +2602,8 @@ func Test_CheckHealth(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
})
|
||||
defer svr.healthChecker.Close()
|
||||
ctx := context.Background()
|
||||
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
@ -2604,11 +2612,7 @@ func Test_CheckHealth(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
|
||||
svr.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
svr.sessionManager = getSessionManager(true)
|
||||
svr.channelManager = getChannelManager(t, true)
|
||||
svr.meta = &meta{
|
||||
svr := newServer(true, true, &meta{
|
||||
collections: collections,
|
||||
channelCPs: &channelCPs{
|
||||
checkpoints: map[string]*msgpb.MsgPosition{
|
||||
@ -2626,7 +2630,8 @@ func Test_CheckHealth(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
defer svr.healthChecker.Close()
|
||||
ctx := context.Background()
|
||||
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -35,7 +35,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/componentutil"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/internal/util/segmentutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
@ -1550,20 +1550,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
|
||||
}, nil
|
||||
}
|
||||
|
||||
err := s.sessionManager.CheckHealth(ctx)
|
||||
if err != nil {
|
||||
return componentutil.CheckHealthRespWithErr(err), nil
|
||||
latestCheckResult := s.healthChecker.GetLatestCheckResult()
|
||||
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
|
||||
}
|
||||
|
||||
func (s *Server) healthCheckFn() *healthcheck.Result {
|
||||
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
checkResults := s.sessionManager.CheckDNHealth(ctx)
|
||||
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
|
||||
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
|
||||
}
|
||||
|
||||
if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
|
||||
return componentutil.CheckHealthRespWithErr(err), nil
|
||||
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
|
||||
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
|
||||
}
|
||||
|
||||
if err = CheckCheckPointsHealth(s.meta); err != nil {
|
||||
return componentutil.CheckHealthRespWithErr(err), nil
|
||||
}
|
||||
|
||||
return componentutil.CheckHealthRespWithErr(nil), nil
|
||||
return checkResults
|
||||
}
|
||||
|
||||
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
|
||||
|
||||
@ -19,6 +19,7 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -31,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
@ -69,7 +71,7 @@ type SessionManager interface {
|
||||
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
|
||||
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
|
||||
DropImport(nodeID int64, in *datapb.DropImportRequest) error
|
||||
CheckHealth(ctx context.Context) error
|
||||
CheckDNHealth(ctx context.Context) *healthcheck.Result
|
||||
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
|
||||
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
|
||||
Close()
|
||||
@ -508,28 +510,44 @@ func (c *SessionManagerImpl) DropImport(nodeID int64, in *datapb.DropImportReque
|
||||
return VerifyResponse(status, err)
|
||||
}
|
||||
|
||||
func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error {
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
func (c *SessionManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
|
||||
result := healthcheck.NewResult()
|
||||
wg := sync.WaitGroup{}
|
||||
wlock := sync.Mutex{}
|
||||
ids := c.GetSessionIDs()
|
||||
|
||||
for _, nodeID := range ids {
|
||||
nodeID := nodeID
|
||||
group.Go(func() error {
|
||||
cli, err := c.getClient(ctx, nodeID)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
datanodeClient, err := c.getClient(ctx, nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
|
||||
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
|
||||
return
|
||||
}
|
||||
|
||||
sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
|
||||
wlock.Lock()
|
||||
result.AppendUnhealthyClusterMsg(
|
||||
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
|
||||
wlock.Unlock()
|
||||
return
|
||||
}
|
||||
err = merr.AnalyzeState("DataNode", nodeID, sta)
|
||||
return err
|
||||
})
|
||||
|
||||
if len(checkHealthResp.Reasons) > 0 {
|
||||
wlock.Lock()
|
||||
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
|
||||
wlock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return group.Wait()
|
||||
wg.Wait()
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {
|
||||
|
||||
@ -271,7 +271,8 @@ func getCompactionMergeInfo(task *datapb.CompactionTask) *milvuspb.CompactionMer
|
||||
}
|
||||
}
|
||||
|
||||
func CheckCheckPointsHealth(meta *meta) error {
|
||||
func CheckCheckPointsHealth(meta *meta) map[int64]string {
|
||||
checkResult := make(map[int64]string)
|
||||
for channel, cp := range meta.GetChannelCheckpoints() {
|
||||
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
|
||||
if collectionID == -1 {
|
||||
@ -285,31 +286,30 @@ func CheckCheckPointsHealth(meta *meta) error {
|
||||
ts, _ := tsoutil.ParseTS(cp.Timestamp)
|
||||
lag := time.Since(ts)
|
||||
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {
|
||||
return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes()))
|
||||
checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return checkResult
|
||||
}
|
||||
|
||||
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error {
|
||||
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string {
|
||||
collIDs := meta.ListCollections()
|
||||
checkResult := make(map[int64]string)
|
||||
for _, collID := range collIDs {
|
||||
collInfo := meta.GetCollection(collID)
|
||||
if collInfo == nil {
|
||||
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
|
||||
log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, channelName := range collInfo.VChannelNames {
|
||||
_, err := channelManager.FindWatcher(channelName)
|
||||
if err != nil {
|
||||
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
|
||||
zap.String("channelName", channelName), zap.Error(err))
|
||||
return err
|
||||
checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return checkResult
|
||||
}
|
||||
|
||||
func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 {
|
||||
|
||||
@ -22,6 +22,7 @@ package datanode
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
@ -37,6 +38,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
@ -45,7 +47,10 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// WatchDmChannels is not in use
|
||||
@ -583,3 +588,20 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo
|
||||
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: merr.Status(err),
|
||||
Reasons: []string{err.Error()},
|
||||
}, nil
|
||||
}
|
||||
|
||||
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
|
||||
minFGChannel, minFGTt := rateCol.getMinFlowGraphTt()
|
||||
if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil {
|
||||
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
|
||||
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
|
||||
}
|
||||
return healthcheck.OK(), nil
|
||||
}
|
||||
|
||||
@ -1114,6 +1114,40 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DataNodeServicesSuite) TestCheckHealth() {
|
||||
s.Run("node not healthy", func() {
|
||||
s.SetupTest()
|
||||
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
ctx := context.Background()
|
||||
resp, err := s.node.CheckHealth(ctx, nil)
|
||||
s.NoError(err)
|
||||
s.False(merr.Ok(resp.GetStatus()))
|
||||
s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
||||
})
|
||||
|
||||
s.Run("exceeded timetick lag on pipeline", func() {
|
||||
s.SetupTest()
|
||||
rateCol.updateFlowGraphTt("timetick-lag-ch", 1)
|
||||
ctx := context.Background()
|
||||
resp, err := s.node.CheckHealth(ctx, nil)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp.GetStatus()))
|
||||
s.False(resp.GetIsHealthy())
|
||||
s.NotEmpty(resp.Reasons)
|
||||
})
|
||||
|
||||
s.Run("ok", func() {
|
||||
s.SetupTest()
|
||||
rateCol.removeFlowGraphChannel("timetick-lag-ch")
|
||||
ctx := context.Background()
|
||||
resp, err := s.node.CheckHealth(ctx, nil)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(resp.GetStatus()))
|
||||
s.True(resp.GetIsHealthy())
|
||||
s.Empty(resp.Reasons)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
|
||||
s.Run("node not healthy", func() {
|
||||
s.SetupTest()
|
||||
|
||||
@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact
|
||||
return client.DropCompactionPlan(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) {
|
||||
return client.CheckHealth(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
@ -407,3 +407,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*
|
||||
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
|
||||
return s.datanode.DropCompactionPlan(ctx, req)
|
||||
}
|
||||
|
||||
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
return s.datanode.CheckHealth(ctx, req)
|
||||
}
|
||||
|
||||
@ -185,6 +185,10 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC
|
||||
return m.status, m.err
|
||||
}
|
||||
|
||||
func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
return &milvuspb.CheckHealthResponse{}, m.err
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
func Test_NewServer(t *testing.T) {
|
||||
paramtable.Init()
|
||||
|
||||
@ -345,3 +345,9 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques
|
||||
return client.DeleteBatch(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) {
|
||||
return client.CheckHealth(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
@ -389,3 +389,7 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo
|
||||
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||
return s.querynode.DeleteBatch(ctx, req)
|
||||
}
|
||||
|
||||
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
return s.querynode.CheckHealth(ctx, req)
|
||||
}
|
||||
|
||||
@ -1879,7 +1879,7 @@ func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.Privileg
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo,error)) *RootCoordCatalog_GetPrivilegeGroup_Call {
|
||||
func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_GetPrivilegeGroup_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
@ -87,6 +87,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func
|
||||
return _c
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockDataNode_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *milvuspb.CheckHealthRequest
|
||||
func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call {
|
||||
return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CompactionV2 provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
@ -101,6 +101,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru
|
||||
return _c
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
for _i := range opts {
|
||||
_va[_i] = opts[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, ctx, in)
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(ctx, in, opts...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
|
||||
r1 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockDataNodeClient_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - in *milvuspb.CheckHealthRequest
|
||||
// - opts ...grpc.CallOption
|
||||
func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call {
|
||||
return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
|
||||
append([]interface{}{ctx, in}, opts...)...)}
|
||||
}
|
||||
|
||||
func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(grpc.CallOption)
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
func (_m *MockDataNodeClient) Close() error {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter {
|
||||
return &MockQueryNode_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockQueryNode_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *milvuspb.CheckHealthRequest
|
||||
func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call {
|
||||
return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Delete provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter {
|
||||
return &MockQueryNodeClient_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
for _i := range opts {
|
||||
_va[_i] = opts[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, ctx, in)
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(ctx, in, opts...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
|
||||
r1 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockQueryNodeClient_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - in *milvuspb.CheckHealthRequest
|
||||
// - opts ...grpc.CallOption
|
||||
func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call {
|
||||
return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
|
||||
append([]interface{}{ctx, in}, opts...)...)}
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(grpc.CallOption)
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Close provides a mock function with given fields:
|
||||
func (_m *MockQueryNodeClient) Close() error {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -131,6 +131,8 @@ service DataNode {
|
||||
rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}
|
||||
|
||||
rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {}
|
||||
|
||||
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
|
||||
}
|
||||
|
||||
message FlushRequest {
|
||||
|
||||
@ -175,7 +175,9 @@ service QueryNode {
|
||||
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||
// it's basically same as `Delete` but cost less memory pressure.
|
||||
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
|
||||
}
|
||||
|
||||
// --------------------QueryCoord grpc request and response proto------------------
|
||||
|
||||
@ -42,7 +42,7 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler,
|
||||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *ChannelLevelScoreBalancer {
|
||||
return &ChannelLevelScoreBalancer{
|
||||
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
|
||||
@ -453,7 +453,7 @@ func (g *randomPlanGenerator) generatePlans() []SegmentAssignPlan {
|
||||
type MultiTargetBalancer struct {
|
||||
*ScoreBasedBalancer
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) {
|
||||
@ -561,7 +561,7 @@ func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSeg
|
||||
return plans
|
||||
}
|
||||
|
||||
func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager) *MultiTargetBalancer {
|
||||
func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr meta.TargetManagerInterface) *MultiTargetBalancer {
|
||||
return &MultiTargetBalancer{
|
||||
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
dist: dist,
|
||||
|
||||
@ -37,7 +37,7 @@ type RowCountBasedBalancer struct {
|
||||
*RoundRobinBalancer
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
}
|
||||
|
||||
// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
|
||||
@ -366,7 +366,7 @@ func NewRowCountBasedBalancer(
|
||||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *RowCountBasedBalancer {
|
||||
return &RowCountBasedBalancer{
|
||||
RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
|
||||
@ -42,7 +42,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
|
||||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *ScoreBasedBalancer {
|
||||
return &ScoreBasedBalancer{
|
||||
RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
|
||||
@ -99,7 +99,7 @@ func NewDistController(
|
||||
client session.Cluster,
|
||||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
scheduler task.Scheduler,
|
||||
syncTargetVersionFn TriggerUpdateTargetVersion,
|
||||
) *ControllerImpl {
|
||||
|
||||
@ -51,7 +51,7 @@ type LoadCollectionJob struct {
|
||||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
collectionObserver *observers.CollectionObserver
|
||||
nodeMgr *session.NodeManager
|
||||
@ -64,7 +64,7 @@ func NewLoadCollectionJob(
|
||||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
collectionObserver *observers.CollectionObserver,
|
||||
nodeMgr *session.NodeManager,
|
||||
@ -265,7 +265,7 @@ type LoadPartitionJob struct {
|
||||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
collectionObserver *observers.CollectionObserver
|
||||
nodeMgr *session.NodeManager
|
||||
@ -278,7 +278,7 @@ func NewLoadPartitionJob(
|
||||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
collectionObserver *observers.CollectionObserver,
|
||||
nodeMgr *session.NodeManager,
|
||||
|
||||
@ -42,7 +42,7 @@ type ReleaseCollectionJob struct {
|
||||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
proxyManager proxyutil.ProxyClientManagerInterface
|
||||
@ -54,7 +54,7 @@ func NewReleaseCollectionJob(ctx context.Context,
|
||||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
checkerController *checkers.CheckerController,
|
||||
proxyManager proxyutil.ProxyClientManagerInterface,
|
||||
@ -82,8 +82,6 @@ func (job *ReleaseCollectionJob) Execute() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
job.meta.CollectionManager.SetReleasing(req.GetCollectionID())
|
||||
|
||||
loadedPartitions := job.meta.CollectionManager.GetPartitionsByCollection(req.GetCollectionID())
|
||||
toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 {
|
||||
return partition.GetPartitionID()
|
||||
@ -130,7 +128,7 @@ type ReleasePartitionJob struct {
|
||||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
proxyManager proxyutil.ProxyClientManagerInterface
|
||||
@ -142,7 +140,7 @@ func NewReleasePartitionJob(ctx context.Context,
|
||||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
checkerController *checkers.CheckerController,
|
||||
proxyManager proxyutil.ProxyClientManagerInterface,
|
||||
|
||||
@ -38,12 +38,12 @@ type UndoList struct {
|
||||
ctx context.Context
|
||||
meta *meta.Meta
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
}
|
||||
|
||||
func NewUndoList(ctx context.Context, meta *meta.Meta,
|
||||
cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver,
|
||||
cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver,
|
||||
) *UndoList {
|
||||
return &UndoList{
|
||||
ctx: ctx,
|
||||
|
||||
@ -50,7 +50,6 @@ type Collection struct {
|
||||
mut sync.RWMutex
|
||||
refreshNotifier chan struct{}
|
||||
LoadSpan trace.Span
|
||||
isReleasing bool
|
||||
}
|
||||
|
||||
func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) {
|
||||
@ -60,18 +59,6 @@ func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) {
|
||||
collection.refreshNotifier = notifier
|
||||
}
|
||||
|
||||
func (collection *Collection) SetReleasing() {
|
||||
collection.mut.Lock()
|
||||
defer collection.mut.Unlock()
|
||||
collection.isReleasing = true
|
||||
}
|
||||
|
||||
func (collection *Collection) IsReleasing() bool {
|
||||
collection.mut.RLock()
|
||||
defer collection.mut.RUnlock()
|
||||
return collection.isReleasing
|
||||
}
|
||||
|
||||
func (collection *Collection) IsRefreshed() bool {
|
||||
collection.mut.RLock()
|
||||
notifier := collection.refreshNotifier
|
||||
@ -439,15 +426,6 @@ func (m *CollectionManager) Exist(collectionID typeutil.UniqueID) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (m *CollectionManager) SetReleasing(collectionID typeutil.UniqueID) {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
coll, ok := m.collections[collectionID]
|
||||
if ok {
|
||||
coll.SetReleasing()
|
||||
}
|
||||
}
|
||||
|
||||
// GetAll returns the collection ID of all loaded collections
|
||||
func (m *CollectionManager) GetAll() []int64 {
|
||||
m.rwmutex.RLock()
|
||||
|
||||
@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter {
|
||||
return &MockQueryNodeServer_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockQueryNodeServer_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *milvuspb.CheckHealthRequest
|
||||
func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call {
|
||||
return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Delete provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
@ -47,7 +47,7 @@ type CollectionObserver struct {
|
||||
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
partitionLoadedCount map[int64]int
|
||||
@ -69,7 +69,7 @@ type LoadTask struct {
|
||||
func NewCollectionObserver(
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *TargetObserver,
|
||||
checherController *checkers.CheckerController,
|
||||
proxyManager proxyutil.ProxyClientManagerInterface,
|
||||
|
||||
@ -75,7 +75,7 @@ type TargetObserver struct {
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
distMgr *meta.DistributionManager
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
@ -101,7 +101,7 @@ type TargetObserver struct {
|
||||
|
||||
func NewTargetObserver(
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
distMgr *meta.DistributionManager,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
|
||||
@ -53,6 +53,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
@ -94,7 +95,7 @@ type Server struct {
|
||||
store metastore.QueryCoordCatalog
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
broker meta.Broker
|
||||
|
||||
// Session
|
||||
@ -134,6 +135,8 @@ type Server struct {
|
||||
proxyCreator proxyutil.ProxyCreator
|
||||
proxyWatcher proxyutil.ProxyWatcherInterface
|
||||
proxyClientManager proxyutil.ProxyClientManagerInterface
|
||||
|
||||
healthChecker *healthcheck.Checker
|
||||
}
|
||||
|
||||
func NewQueryCoord(ctx context.Context) (*Server, error) {
|
||||
@ -346,6 +349,8 @@ func (s *Server) initQueryCoord() error {
|
||||
// Init load status cache
|
||||
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
|
||||
|
||||
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
|
||||
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
|
||||
log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
|
||||
return err
|
||||
}
|
||||
@ -489,6 +494,7 @@ func (s *Server) startQueryCoord() error {
|
||||
|
||||
s.startServerLoop()
|
||||
s.afterStart()
|
||||
s.healthChecker.Start()
|
||||
s.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID())
|
||||
return nil
|
||||
@ -525,7 +531,9 @@ func (s *Server) Stop() error {
|
||||
// FOLLOW the dependence graph:
|
||||
// job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session
|
||||
// observers -> dist controller
|
||||
|
||||
if s.healthChecker != nil {
|
||||
s.healthChecker.Close()
|
||||
}
|
||||
if s.jobScheduler != nil {
|
||||
log.Info("stop job scheduler...")
|
||||
s.jobScheduler.Stop()
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
@ -35,7 +36,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/job"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/componentutil"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
@ -935,16 +936,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
|
||||
return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil
|
||||
}
|
||||
|
||||
errReasons, err := s.checkNodeHealth(ctx)
|
||||
if err != nil || len(errReasons) != 0 {
|
||||
return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil
|
||||
}
|
||||
latestCheckResult := s.healthChecker.GetLatestCheckResult()
|
||||
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
|
||||
}
|
||||
|
||||
if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
|
||||
log.Warn("some collection is not queryable during health check", zap.Error(err))
|
||||
}
|
||||
func (s *Server) healthCheckFn() *healthcheck.Result {
|
||||
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
|
||||
ctx, cancel := context.WithTimeout(s.ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
return componentutil.CheckHealthRespWithErr(nil), nil
|
||||
checkResults := s.broadcastCheckHealth(ctx)
|
||||
for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) {
|
||||
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable))
|
||||
}
|
||||
return checkResults
|
||||
}
|
||||
|
||||
func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
|
||||
@ -975,6 +980,39 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
|
||||
return errReasons, err
|
||||
}
|
||||
|
||||
func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result {
|
||||
result := healthcheck.NewResult()
|
||||
wg := sync.WaitGroup{}
|
||||
wlock := sync.Mutex{}
|
||||
|
||||
for _, node := range s.nodeMgr.GetAll() {
|
||||
node := node
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID())
|
||||
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
|
||||
err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err)
|
||||
wlock.Lock()
|
||||
result.AppendUnhealthyClusterMsg(
|
||||
healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck))
|
||||
wlock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if len(checkHealthResp.Reasons) > 0 {
|
||||
wlock.Lock()
|
||||
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
|
||||
wlock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("rgName", req.GetResourceGroup()),
|
||||
|
||||
@ -48,6 +48,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
@ -171,6 +172,11 @@ func (suite *ServiceSuite) SetupTest() {
|
||||
}
|
||||
suite.cluster = session.NewMockCluster(suite.T())
|
||||
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
||||
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
IsHealthy: true,
|
||||
Reasons: []string{},
|
||||
}, nil).Maybe()
|
||||
suite.jobScheduler = job.NewScheduler()
|
||||
suite.taskScheduler = task.NewMockScheduler(suite.T())
|
||||
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
|
||||
@ -1630,42 +1636,82 @@ func (suite *ServiceSuite) TestCheckHealth() {
|
||||
suite.loadAll()
|
||||
ctx := context.Background()
|
||||
server := suite.server
|
||||
server.healthChecker = healthcheck.NewChecker(40*time.Millisecond, suite.server.healthCheckFn)
|
||||
server.healthChecker.Start()
|
||||
defer server.healthChecker.Close()
|
||||
|
||||
assertCheckHealthResult := func(isHealthy bool) {
|
||||
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, isHealthy)
|
||||
if !isHealthy {
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
} else {
|
||||
suite.Empty(resp.Reasons)
|
||||
}
|
||||
}
|
||||
|
||||
setNodeSate := func(isHealthy bool, isRPCFail bool) {
|
||||
var resp *milvuspb.CheckHealthResponse
|
||||
if isHealthy {
|
||||
resp = healthcheck.OK()
|
||||
} else {
|
||||
resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck))
|
||||
}
|
||||
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
|
||||
if isRPCFail {
|
||||
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny}
|
||||
}
|
||||
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset()
|
||||
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Test for server is not healthy
|
||||
server.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||
assertCheckHealthResult(false)
|
||||
|
||||
// Test for check health has some error reasons
|
||||
setNodeSate(false, false)
|
||||
server.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
assertCheckHealthResult(false)
|
||||
|
||||
// Test for check health rpc fail
|
||||
setNodeSate(true, true)
|
||||
server.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
assertCheckHealthResult(false)
|
||||
|
||||
// Test for check load percentage fail
|
||||
setNodeSate(true, false)
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for check channel ok
|
||||
for _, collection := range suite.collections {
|
||||
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
|
||||
suite.updateChannelDist(collection)
|
||||
}
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for check channel fail
|
||||
tm := meta.NewMockTargetManager(suite.T())
|
||||
tm.EXPECT().GetDmChannelsByCollection(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
otm := server.targetMgr
|
||||
server.targetMgr = tm
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for get shard leader fail
|
||||
server.targetMgr = otm
|
||||
for _, node := range suite.nodes {
|
||||
suite.nodeMgr.Stopping(node)
|
||||
}
|
||||
|
||||
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1")
|
||||
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, false)
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
|
||||
// Test for components state fail
|
||||
for _, node := range suite.nodes {
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
|
||||
&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
},
|
||||
nil).Once()
|
||||
}
|
||||
server.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, false)
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
|
||||
// Test for server is healthy
|
||||
for _, node := range suite.nodes {
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
|
||||
&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
},
|
||||
nil).Once()
|
||||
}
|
||||
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, true)
|
||||
suite.Empty(resp.Reasons)
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestGetShardLeaders() {
|
||||
|
||||
@ -52,6 +52,7 @@ type Cluster interface {
|
||||
GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
|
||||
SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error)
|
||||
GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error)
|
||||
CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error)
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
|
||||
var (
|
||||
resp *milvuspb.CheckHealthResponse
|
||||
err error
|
||||
)
|
||||
err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) {
|
||||
resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
})
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
type clients struct {
|
||||
sync.RWMutex
|
||||
clients map[int64]types.QueryNodeClient // nodeID -> client
|
||||
|
||||
@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter {
|
||||
return &MockCluster_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: ctx, nodeID
|
||||
func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret := _m.Called(ctx, nodeID)
|
||||
|
||||
var r0 *milvuspb.CheckHealthResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok {
|
||||
return rf(ctx, nodeID)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok {
|
||||
r0 = rf(ctx, nodeID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||
r1 = rf(ctx, nodeID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
|
||||
type MockCluster_CheckHealth_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckHealth is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - nodeID int64
|
||||
func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call {
|
||||
return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)}
|
||||
}
|
||||
|
||||
func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetComponentStates provides a mock function with given fields: ctx, nodeID
|
||||
func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) {
|
||||
ret := _m.Called(ctx, nodeID)
|
||||
|
||||
@ -58,7 +58,7 @@ type Executor struct {
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
broker meta.Broker
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
|
||||
@ -70,7 +70,7 @@ type Executor struct {
|
||||
func NewExecutor(meta *meta.Meta,
|
||||
dist *meta.DistributionManager,
|
||||
broker meta.Broker,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
cluster session.Cluster,
|
||||
nodeMgr *session.NodeManager,
|
||||
) *Executor {
|
||||
|
||||
@ -157,7 +157,7 @@ type taskScheduler struct {
|
||||
|
||||
distMgr *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
@ -172,7 +172,7 @@ type taskScheduler struct {
|
||||
func NewScheduler(ctx context.Context,
|
||||
meta *meta.Meta,
|
||||
distMgr *meta.DistributionManager,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
nodeMgr *session.NodeManager,
|
||||
|
||||
@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target
|
||||
for segmentID, info := range segmentDist {
|
||||
_, exist := leader.Segments[segmentID]
|
||||
if !exist {
|
||||
log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
|
||||
log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
|
||||
return merr.WrapErrSegmentLack(segmentID)
|
||||
}
|
||||
|
||||
l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID
|
||||
if l0WithWrongLocation {
|
||||
log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
|
||||
log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
|
||||
return merr.WrapErrSegmentLack(segmentID)
|
||||
}
|
||||
}
|
||||
@ -108,13 +108,11 @@ func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager,
|
||||
func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager,
|
||||
nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel,
|
||||
) ([]*querypb.ShardLeadersList, error) {
|
||||
ret := make([]*querypb.ShardLeadersList, 0)
|
||||
for _, channel := range channels {
|
||||
log := log.With(zap.String("channel", channel.GetChannelName()))
|
||||
|
||||
var channelErr error
|
||||
leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
|
||||
if len(leaders) == 0 {
|
||||
@ -132,7 +130,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di
|
||||
|
||||
if len(readableLeaders) == 0 {
|
||||
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
|
||||
log.Warn(msg, zap.Error(channelErr))
|
||||
log.RatedWarn(60, msg, zap.Error(channelErr))
|
||||
err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error())
|
||||
return nil, err
|
||||
}
|
||||
@ -169,7 +167,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
|
||||
func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
|
||||
if err := checkLoadStatus(ctx, m, collectionID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -185,19 +183,24 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetMa
|
||||
}
|
||||
|
||||
// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
|
||||
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
|
||||
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
|
||||
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string {
|
||||
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
|
||||
checkResult := make(map[int64]string)
|
||||
for _, coll := range m.GetAllCollections() {
|
||||
err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll)
|
||||
if err != nil && !coll.IsReleasing() && time.Since(coll.UpdatedAt) >= maxInterval {
|
||||
return err
|
||||
// the collection is not queryable, if meet following conditions:
|
||||
// 1. Some segments are not loaded
|
||||
// 2. Collection is not starting to release
|
||||
// 3. The load percentage has not been updated in the last 5 minutes.
|
||||
if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval {
|
||||
checkResult[coll.CollectionID] = err.Error()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return checkResult
|
||||
}
|
||||
|
||||
// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection
|
||||
func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error {
|
||||
func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error {
|
||||
collectionID := coll.GetCollectionID()
|
||||
if err := checkLoadStatus(ctx, m, collectionID); err != nil {
|
||||
return err
|
||||
|
||||
@ -42,6 +42,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/streamrpc"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
@ -53,6 +54,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -1438,6 +1440,25 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: merr.Status(err),
|
||||
Reasons: []string{err.Error()},
|
||||
}, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
|
||||
minTsafeChannel, minTsafe := node.tSafeManager.Min()
|
||||
if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil {
|
||||
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
|
||||
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
|
||||
}
|
||||
|
||||
return healthcheck.OK(), nil
|
||||
}
|
||||
|
||||
// DeleteBatch is the API to apply same delete data into multiple segments.
|
||||
// it's basically same as `Delete` but cost less memory pressure.
|
||||
func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
|
||||
|
||||
@ -44,6 +44,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/streamrpc"
|
||||
@ -97,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false")
|
||||
|
||||
suite.rootPath = suite.T().Name()
|
||||
suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name())
|
||||
suite.collectionID = 111
|
||||
suite.collectionName = "test-collection"
|
||||
suite.partitionIDs = []int64{222}
|
||||
@ -2197,6 +2198,41 @@ func (suite *ServiceSuite) TestLoadPartition() {
|
||||
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestCheckHealth() {
|
||||
suite.Run("node not healthy", func() {
|
||||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
||||
ctx := context.Background()
|
||||
resp, err := suite.node.CheckHealth(ctx, nil)
|
||||
suite.NoError(err)
|
||||
suite.False(merr.Ok(resp.GetStatus()))
|
||||
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
|
||||
})
|
||||
|
||||
suite.Run("exceeded timetick lag on pipeline", func() {
|
||||
suite.node.tSafeManager = tsafe.NewTSafeReplica()
|
||||
suite.node.tSafeManager.Add(context.TODO(), "timetick-lag-ch", 1)
|
||||
ctx := context.Background()
|
||||
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err := suite.node.CheckHealth(ctx, nil)
|
||||
suite.NoError(err)
|
||||
suite.True(merr.Ok(resp.GetStatus()))
|
||||
suite.False(resp.GetIsHealthy())
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
})
|
||||
|
||||
suite.Run("ok", func() {
|
||||
ctx := context.Background()
|
||||
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
suite.node.tSafeManager = tsafe.NewTSafeReplica()
|
||||
resp, err := suite.node.CheckHealth(ctx, nil)
|
||||
suite.NoError(err)
|
||||
suite.True(merr.Ok(resp.GetStatus()))
|
||||
suite.True(resp.GetIsHealthy())
|
||||
suite.Empty(resp.Reasons)
|
||||
})
|
||||
}
|
||||
|
||||
func TestQueryNodeService(t *testing.T) {
|
||||
suite.Run(t, new(ServiceSuite))
|
||||
}
|
||||
|
||||
@ -404,6 +404,7 @@ func newMockProxy() *mockProxy {
|
||||
|
||||
func newTestCore(opts ...Opt) *Core {
|
||||
c := &Core{
|
||||
ctx: context.TODO(),
|
||||
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}},
|
||||
}
|
||||
executor := newMockStepExecutor()
|
||||
|
||||
@ -31,7 +31,6 @@ import (
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
@ -49,6 +48,7 @@ import (
|
||||
tso2 "github.com/milvus-io/milvus/internal/tso"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
@ -127,6 +127,7 @@ type Core struct {
|
||||
|
||||
enableActiveStandBy bool
|
||||
activateFunc func() error
|
||||
healthChecker *healthcheck.Checker
|
||||
}
|
||||
|
||||
// --------------------- function --------------------------
|
||||
@ -481,6 +482,8 @@ func (c *Core) initInternal() error {
|
||||
return err
|
||||
}
|
||||
|
||||
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
|
||||
c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn)
|
||||
log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
|
||||
return nil
|
||||
}
|
||||
@ -757,6 +760,7 @@ func (c *Core) startInternal() error {
|
||||
}()
|
||||
|
||||
c.startServerLoop()
|
||||
c.healthChecker.Start()
|
||||
c.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
|
||||
logutil.Logger(c.ctx).Info("rootcoord startup successfully")
|
||||
@ -815,6 +819,10 @@ func (c *Core) revokeSession() {
|
||||
// Stop stops rootCoord.
|
||||
func (c *Core) Stop() error {
|
||||
c.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
if c.healthChecker != nil {
|
||||
c.healthChecker.Close()
|
||||
}
|
||||
|
||||
c.stopExecutor()
|
||||
c.stopScheduler()
|
||||
if c.proxyWatcher != nil {
|
||||
@ -3008,53 +3016,40 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest)
|
||||
}, nil
|
||||
}
|
||||
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
errs := typeutil.NewConcurrentSet[error]()
|
||||
latestCheckResult := c.healthChecker.GetLatestCheckResult()
|
||||
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
|
||||
}
|
||||
|
||||
func (c *Core) healthCheckFn() *healthcheck.Result {
|
||||
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
|
||||
ctx, cancel := context.WithTimeout(c.ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
proxyClients := c.proxyClientManager.GetProxyClients()
|
||||
wg := sync.WaitGroup{}
|
||||
lock := sync.Mutex{}
|
||||
result := healthcheck.NewResult()
|
||||
|
||||
proxyClients.Range(func(key int64, value types.ProxyClient) bool {
|
||||
nodeID := key
|
||||
proxyClient := value
|
||||
group.Go(func() error {
|
||||
sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
if err != nil {
|
||||
errs.Insert(err)
|
||||
return err
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err)
|
||||
|
||||
err = merr.AnalyzeState("Proxy", nodeID, sta)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if err != nil {
|
||||
errs.Insert(err)
|
||||
result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}()
|
||||
return true
|
||||
})
|
||||
|
||||
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
|
||||
if maxDelay > 0 {
|
||||
group.Go(func() error {
|
||||
err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
|
||||
if err != nil {
|
||||
errs.Insert(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
err := group.Wait()
|
||||
if err != nil {
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: merr.Success(),
|
||||
IsHealthy: false,
|
||||
Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
|
||||
return err.Error()
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
|
||||
wg.Wait()
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
|
||||
|
||||
@ -32,7 +32,6 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
@ -40,6 +39,7 @@ import (
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
|
||||
"github.com/milvus-io/milvus/internal/util/healthcheck"
|
||||
"github.com/milvus-io/milvus/internal/util/proxyutil"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
@ -49,7 +49,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tikv"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -1453,65 +1452,6 @@ func TestRootCoord_AlterCollection(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRootCoord_CheckHealth(t *testing.T) {
|
||||
getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
|
||||
clusterTopology := metricsinfo.QueryClusterTopology{
|
||||
ConnectedNodes: []metricsinfo.QueryNodeInfos{
|
||||
{
|
||||
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
|
||||
Fgm: metricsinfo.FlowGraphMetric{
|
||||
MinFlowGraphChannel: "ch1",
|
||||
MinFlowGraphTt: tt,
|
||||
NumFlowGraph: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology})
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Success(),
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
|
||||
clusterTopology := metricsinfo.DataClusterTopology{
|
||||
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
|
||||
{
|
||||
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
|
||||
Fgm: metricsinfo.FlowGraphMetric{
|
||||
MinFlowGraphChannel: "ch1",
|
||||
MinFlowGraphTt: tt,
|
||||
NumFlowGraph: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology})
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: merr.Success(),
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0)
|
||||
datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0)
|
||||
|
||||
dcClient := mocks.NewMockDataCoordClient(t)
|
||||
dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT))
|
||||
qcClient := mocks.NewMockQueryCoordClient(t)
|
||||
qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT))
|
||||
|
||||
errDataCoordClient := mocks.NewMockDataCoordClient(t)
|
||||
errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
errQueryCoordClient := mocks.NewMockQueryCoordClient(t)
|
||||
errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
|
||||
t.Run("not healthy", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := newTestCore(withAbnormalCode())
|
||||
@ -1521,25 +1461,13 @@ func TestRootCoord_CheckHealth(t *testing.T) {
|
||||
assert.NotEmpty(t, resp.Reasons)
|
||||
})
|
||||
|
||||
t.Run("ok with disabled tt lag configuration", func(t *testing.T) {
|
||||
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
|
||||
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1")
|
||||
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
|
||||
|
||||
c := newTestCore(withHealthyCode(), withValidProxyManager())
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, resp.IsHealthy)
|
||||
assert.Empty(t, resp.Reasons)
|
||||
})
|
||||
|
||||
t.Run("proxy health check fail with invalid proxy", func(t *testing.T) {
|
||||
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
|
||||
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
|
||||
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
|
||||
c := newTestCore(withHealthyCode(), withInvalidProxyManager())
|
||||
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
|
||||
c.healthChecker.Start()
|
||||
defer c.healthChecker.Close()
|
||||
|
||||
c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
@ -1548,55 +1476,14 @@ func TestRootCoord_CheckHealth(t *testing.T) {
|
||||
assert.NotEmpty(t, resp.Reasons)
|
||||
})
|
||||
|
||||
t.Run("proxy health check fail with get metrics error", func(t *testing.T) {
|
||||
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
|
||||
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
|
||||
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
c := newTestCore(withHealthyCode(), withValidProxyManager())
|
||||
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
|
||||
c.healthChecker.Start()
|
||||
defer c.healthChecker.Close()
|
||||
|
||||
{
|
||||
c := newTestCore(withHealthyCode(),
|
||||
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, resp.IsHealthy)
|
||||
assert.NotEmpty(t, resp.Reasons)
|
||||
}
|
||||
|
||||
{
|
||||
c := newTestCore(withHealthyCode(),
|
||||
withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient))
|
||||
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, resp.IsHealthy)
|
||||
assert.NotEmpty(t, resp.Reasons)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ok with tt lag exceeded", func(t *testing.T) {
|
||||
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
|
||||
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90")
|
||||
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
|
||||
|
||||
c := newTestCore(withHealthyCode(),
|
||||
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, false, resp.IsHealthy)
|
||||
assert.NotEmpty(t, resp.Reasons)
|
||||
})
|
||||
|
||||
t.Run("ok with tt lag checking", func(t *testing.T) {
|
||||
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
|
||||
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600")
|
||||
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
|
||||
|
||||
c := newTestCore(withHealthyCode(),
|
||||
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
|
||||
ctx := context.Background()
|
||||
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
@ -21,10 +21,8 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
@ -34,7 +32,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -284,83 +281,3 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout)
|
||||
defer cancel()
|
||||
|
||||
now := time.Now()
|
||||
group := &errgroup.Group{}
|
||||
queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
|
||||
dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
|
||||
|
||||
group.Go(func() error {
|
||||
queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
|
||||
qm := queryNodeMetric.QuotaMetrics
|
||||
if qm != nil {
|
||||
if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" {
|
||||
minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt)
|
||||
delay := now.Sub(minTt)
|
||||
|
||||
if delay.Milliseconds() >= maxDelay.Milliseconds() {
|
||||
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// get Data cluster metrics
|
||||
group.Go(func() error {
|
||||
dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
|
||||
dm := dataNodeMetric.QuotaMetrics
|
||||
if dm != nil {
|
||||
if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" {
|
||||
minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt)
|
||||
delay := now.Sub(minTt)
|
||||
|
||||
if delay.Milliseconds() >= maxDelay.Milliseconds() {
|
||||
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := group.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var maxLagChannel string
|
||||
var maxLag time.Duration
|
||||
findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) {
|
||||
for _, param := range params {
|
||||
param.Range(func(k string, v time.Duration) bool {
|
||||
if v > maxLag {
|
||||
maxLag = v
|
||||
maxLagChannel = k
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
findMaxLagChannel(queryNodeTTDelay, dataNodeTTDelay)
|
||||
|
||||
if maxLag > 0 && len(maxLagChannel) != 0 {
|
||||
return fmt.Errorf("max timetick lag execced threhold, max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -84,17 +84,3 @@ func WaitForComponentHealthy[T interface {
|
||||
}](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error {
|
||||
return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep)
|
||||
}
|
||||
|
||||
func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse {
|
||||
if err != nil {
|
||||
return CheckHealthRespWithErrMsg(err.Error())
|
||||
}
|
||||
return CheckHealthRespWithErrMsg()
|
||||
}
|
||||
|
||||
func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse {
|
||||
if len(errMsg) != 0 {
|
||||
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg}
|
||||
}
|
||||
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
|
||||
}
|
||||
|
||||
276
internal/util/healthcheck/checker.go
Normal file
276
internal/util/healthcheck/checker.go
Normal file
@ -0,0 +1,276 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/json"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
// UnHealthyLevel represents the health level of a system.
|
||||
type UnHealthyLevel int
|
||||
|
||||
const (
|
||||
// Healthy means the system is operating normally.
|
||||
Healthy UnHealthyLevel = iota
|
||||
// Warning indicates minor issues that might escalate.
|
||||
Warning
|
||||
// Critical indicates major issues that need immediate attention.
|
||||
Critical
|
||||
// Fatal indicates system failure.
|
||||
Fatal
|
||||
)
|
||||
|
||||
// String returns the string representation of the UnHealthyLevel.
|
||||
func (u UnHealthyLevel) String() string {
|
||||
switch u {
|
||||
case Healthy:
|
||||
return "Healthy"
|
||||
case Warning:
|
||||
return "Warning"
|
||||
case Critical:
|
||||
return "Critical"
|
||||
case Fatal:
|
||||
return "Fatal"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type Item int
|
||||
|
||||
const (
|
||||
ChannelsWatched Item = iota
|
||||
CheckpointLagExceed
|
||||
CollectionQueryable
|
||||
TimeTickLagExceed
|
||||
NodeHealthCheck
|
||||
)
|
||||
|
||||
func getUnhealthyLevel(item Item) UnHealthyLevel {
|
||||
switch item {
|
||||
case ChannelsWatched:
|
||||
return Fatal
|
||||
case CheckpointLagExceed:
|
||||
return Fatal
|
||||
case TimeTickLagExceed:
|
||||
return Fatal
|
||||
case NodeHealthCheck:
|
||||
return Fatal
|
||||
case CollectionQueryable:
|
||||
return Critical
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown health check item: %d", int(item)))
|
||||
}
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"`
|
||||
UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"`
|
||||
}
|
||||
|
||||
func NewResult() *Result {
|
||||
return &Result{}
|
||||
}
|
||||
|
||||
func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) {
|
||||
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm)
|
||||
}
|
||||
|
||||
func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) {
|
||||
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm)
|
||||
}
|
||||
|
||||
func (r *Result) AppendResult(other *Result) {
|
||||
if other == nil {
|
||||
return
|
||||
}
|
||||
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...)
|
||||
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...)
|
||||
}
|
||||
|
||||
func (r *Result) IsEmpty() bool {
|
||||
return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0
|
||||
}
|
||||
|
||||
func (r *Result) IsHealthy() bool {
|
||||
if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, unm := range r.UnhealthyClusterMsgs {
|
||||
if unm.Reason.UnhealthyLevel == Fatal {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for _, ucm := range r.UnhealthyCollectionMsgs {
|
||||
if ucm.Reason.UnhealthyLevel == Fatal {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
type UnhealthyReason struct {
|
||||
UnhealthyMsg string `json:"unhealthy_msg"`
|
||||
UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"`
|
||||
}
|
||||
|
||||
type UnhealthyClusterMsg struct {
|
||||
Role string `json:"role"`
|
||||
NodeID int64 `json:"node_id"`
|
||||
Reason *UnhealthyReason `json:"reason"`
|
||||
}
|
||||
|
||||
func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg {
|
||||
return &UnhealthyClusterMsg{
|
||||
Role: role,
|
||||
NodeID: nodeID,
|
||||
Reason: &UnhealthyReason{
|
||||
UnhealthyMsg: unhealthyMsg,
|
||||
UnhealthyLevel: getUnhealthyLevel(item),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type UnhealthyCollectionMsg struct {
|
||||
DatabaseID int64 `json:"database_id"`
|
||||
CollectionID int64 `json:"collection_id"`
|
||||
Reason *UnhealthyReason `json:"reason"`
|
||||
}
|
||||
|
||||
func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg {
|
||||
return &UnhealthyCollectionMsg{
|
||||
CollectionID: collectionID,
|
||||
Reason: &UnhealthyReason{
|
||||
UnhealthyMsg: unhealthyMsg,
|
||||
UnhealthyLevel: getUnhealthyLevel(item),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Checker struct {
|
||||
sync.RWMutex
|
||||
interval time.Duration
|
||||
done chan struct{}
|
||||
checkFn func() *Result
|
||||
latestResult *Result
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func NewChecker(interval time.Duration, checkFn func() *Result) *Checker {
|
||||
checker := &Checker{
|
||||
interval: interval,
|
||||
checkFn: checkFn,
|
||||
latestResult: NewResult(),
|
||||
done: make(chan struct{}, 1),
|
||||
once: sync.Once{},
|
||||
}
|
||||
return checker
|
||||
}
|
||||
|
||||
func (hc *Checker) Start() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(hc.interval)
|
||||
defer ticker.Stop()
|
||||
log.Info("start health checker")
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
hc.Lock()
|
||||
hc.latestResult = hc.checkFn()
|
||||
hc.Unlock()
|
||||
case <-hc.done:
|
||||
log.Info("stop health checker")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (hc *Checker) GetLatestCheckResult() *Result {
|
||||
hc.RLock()
|
||||
defer hc.RUnlock()
|
||||
return hc.latestResult
|
||||
}
|
||||
|
||||
func (hc *Checker) Close() {
|
||||
hc.once.Do(func() {
|
||||
close(hc.done)
|
||||
})
|
||||
}
|
||||
|
||||
func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result {
|
||||
var r Result
|
||||
if len(resp.Reasons) == 0 {
|
||||
return &r
|
||||
}
|
||||
if len(resp.Reasons) > 1 {
|
||||
log.Error("invalid check result", zap.Any("reasons", resp.Reasons))
|
||||
return &r
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(resp.Reasons[0]), &r)
|
||||
if err != nil {
|
||||
log.Error("unmarshal check result error", zap.String("error", err.Error()))
|
||||
}
|
||||
return &r
|
||||
}
|
||||
|
||||
func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse {
|
||||
r := &Result{UnhealthyClusterMsgs: msg}
|
||||
reasons, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
log.Error("marshal check result error", zap.String("error", err.Error()))
|
||||
}
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: merr.Success(),
|
||||
IsHealthy: r.IsHealthy(),
|
||||
Reasons: []string{string(reasons)},
|
||||
}
|
||||
}
|
||||
|
||||
func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse {
|
||||
if checkResult.IsEmpty() {
|
||||
return OK()
|
||||
}
|
||||
|
||||
reason, err := json.Marshal(checkResult)
|
||||
if err != nil {
|
||||
log.Error("marshal check result error", zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
return &milvuspb.CheckHealthResponse{
|
||||
Status: merr.Success(),
|
||||
IsHealthy: checkResult.IsHealthy(),
|
||||
Reasons: []string{string(reason)},
|
||||
}
|
||||
}
|
||||
|
||||
func OK() *milvuspb.CheckHealthResponse {
|
||||
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
|
||||
}
|
||||
60
internal/util/healthcheck/checker_test.go
Normal file
60
internal/util/healthcheck/checker_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
func TestChecker(t *testing.T) {
|
||||
expected1 := NewResult()
|
||||
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
|
||||
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
|
||||
|
||||
expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{
|
||||
CollectionID: 1,
|
||||
Reason: &UnhealthyReason{
|
||||
UnhealthyMsg: "msg2",
|
||||
UnhealthyLevel: Critical,
|
||||
},
|
||||
})
|
||||
|
||||
checkFn := func() *Result {
|
||||
return expected1
|
||||
}
|
||||
checker := NewChecker(100*time.Millisecond, checkFn)
|
||||
go checker.Start()
|
||||
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
actual1 := checker.GetLatestCheckResult()
|
||||
assert.Equal(t, expected1, actual1)
|
||||
assert.False(t, actual1.IsHealthy())
|
||||
|
||||
chr := GetCheckHealthResponseFromResult(actual1)
|
||||
assert.Equal(t, merr.Success(), chr.Status)
|
||||
assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy)
|
||||
assert.Equal(t, 1, len(chr.Reasons))
|
||||
|
||||
actualResult := GetHealthCheckResultFromResp(chr)
|
||||
assert.Equal(t, actual1, actualResult)
|
||||
checker.Close()
|
||||
}
|
||||
@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo
|
||||
func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{}, m.Err
|
||||
}
|
||||
|
||||
func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
return &milvuspb.CheckHealthResponse{}, m.Err
|
||||
}
|
||||
|
||||
@ -134,6 +134,10 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet
|
||||
return &querypb.DeleteBatchResponse{}, m.Err
|
||||
}
|
||||
|
||||
func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
return &milvuspb.CheckHealthResponse{}, m.Err
|
||||
}
|
||||
|
||||
func (m *GrpcQueryNodeClient) Close() error {
|
||||
return m.Err
|
||||
}
|
||||
|
||||
@ -152,6 +152,10 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa
|
||||
return qn.QueryNode.DeleteBatch(ctx, in)
|
||||
}
|
||||
|
||||
func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
return qn.QueryNode.CheckHealth(ctx, req)
|
||||
}
|
||||
|
||||
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
|
||||
return &qnServerWrapper{
|
||||
QueryNode: qn,
|
||||
|
||||
@ -24,7 +24,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -254,13 +253,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri
|
||||
}
|
||||
|
||||
func GetCollectionIDFromVChannel(vChannelName string) int64 {
|
||||
re := regexp.MustCompile(`.*_(\d+)v\d+`)
|
||||
matches := re.FindStringSubmatch(vChannelName)
|
||||
if len(matches) > 1 {
|
||||
number, err := strconv.ParseInt(matches[1], 0, 64)
|
||||
if err == nil {
|
||||
return number
|
||||
}
|
||||
end := strings.LastIndexByte(vChannelName, 'v')
|
||||
if end <= 0 {
|
||||
return -1
|
||||
}
|
||||
start := strings.LastIndexByte(vChannelName, '_')
|
||||
if start <= 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
collectionIDStr := vChannelName[start+1 : end]
|
||||
if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil {
|
||||
return collectionID
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
@ -299,6 +299,13 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error {
|
||||
return CheckHealthy(stateCode)
|
||||
}
|
||||
|
||||
func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "service is unhealthy")
|
||||
}
|
||||
return AnalyzeState(role, nodeID, resp)
|
||||
}
|
||||
|
||||
func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error {
|
||||
if err := Error(state.GetStatus()); err != nil {
|
||||
return errors.Wrapf(err, "%s=%d not healthy", role, nodeID)
|
||||
|
||||
@ -266,6 +266,9 @@ type commonConfig struct {
|
||||
ReadOnlyPrivileges ParamItem `refreshable:"false"`
|
||||
ReadWritePrivileges ParamItem `refreshable:"false"`
|
||||
AdminPrivileges ParamItem `refreshable:"false"`
|
||||
|
||||
HealthCheckInterval ParamItem `refreshable:"true"`
|
||||
HealthCheckRPCTimeout ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *commonConfig) init(base *BaseTable) {
|
||||
@ -915,6 +918,22 @@ This helps Milvus-CDC synchronize incremental data`,
|
||||
Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`,
|
||||
}
|
||||
p.AdminPrivileges.Init(base.mgr)
|
||||
|
||||
p.HealthCheckInterval = ParamItem{
|
||||
Key: "common.healthcheck.interval.seconds",
|
||||
Version: "2.4.8",
|
||||
DefaultValue: "30",
|
||||
Doc: `health check interval in seconds, default 30s`,
|
||||
}
|
||||
p.HealthCheckInterval.Init(base.mgr)
|
||||
|
||||
p.HealthCheckRPCTimeout = ParamItem{
|
||||
Key: "common.healthcheck.timeout.seconds",
|
||||
Version: "2.4.8",
|
||||
DefaultValue: "10",
|
||||
Doc: `RPC timeout for health check request`,
|
||||
}
|
||||
p.HealthCheckRPCTimeout.Init(base.mgr)
|
||||
}
|
||||
|
||||
type gpuConfig struct {
|
||||
@ -2169,9 +2188,9 @@ If this parameter is set false, Milvus simply searches the growing segments with
|
||||
p.UpdateCollectionLoadStatusInterval = ParamItem{
|
||||
Key: "queryCoord.updateCollectionLoadStatusInterval",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "5",
|
||||
DefaultValue: "300",
|
||||
PanicIfEmpty: true,
|
||||
Doc: "5m, max interval for updating collection loaded status",
|
||||
Doc: "300s, max interval of updating collection loaded status for check health",
|
||||
Export: true,
|
||||
}
|
||||
|
||||
|
||||
@ -126,6 +126,11 @@ func TestComponentParam(t *testing.T) {
|
||||
params.Save("common.gchelper.minimumGoGC", "80")
|
||||
assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt())
|
||||
|
||||
params.Save("common.healthcheck.interval.seconds", "60")
|
||||
assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second))
|
||||
params.Save("common.healthcheck.timeout.seconds", "5")
|
||||
assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt())
|
||||
|
||||
assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings()))
|
||||
assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings()))
|
||||
assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings()))
|
||||
@ -304,8 +309,8 @@ func TestComponentParam(t *testing.T) {
|
||||
checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt()
|
||||
assert.Equal(t, 2000, checkHealthRPCTimeout)
|
||||
|
||||
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
|
||||
assert.Equal(t, updateInterval, time.Minute*5)
|
||||
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
|
||||
assert.Equal(t, updateInterval, time.Second*300)
|
||||
|
||||
assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat())
|
||||
params.Save("queryCoord.globalRowCountFactor", "0.4")
|
||||
|
||||
@ -16,7 +16,13 @@
|
||||
|
||||
package ratelimitutil
|
||||
|
||||
import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
var QuotaErrorString = map[commonpb.ErrorCode]string{
|
||||
commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator",
|
||||
@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{
|
||||
func GetQuotaErrorString(errCode commonpb.ErrorCode) string {
|
||||
return QuotaErrorString[errCode]
|
||||
}
|
||||
|
||||
func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error {
|
||||
if channel != "" && maxDelay > 0 {
|
||||
minTt, _ := tsoutil.ParseTS(minTT)
|
||||
delay := time.Since(minTt)
|
||||
if delay.Milliseconds() >= maxDelay.Milliseconds() {
|
||||
return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user