diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index d3bbc0527b..cb9310dbd1 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -167,7 +167,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, }, nil } hardwareInfos := metricsinfo.HardwareMetrics{ - IP: node.session.Address, + IP: node.session.GetAddress(), CPUCoreCount: hardware.GetCPUNum(), CPUCoreUsage: hardware.GetCPUUsage(), Memory: totalMem, @@ -185,7 +185,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, CreatedTime: paramtable.GetCreateTime().String(), UpdatedTime: paramtable.GetUpdateTime().String(), Type: typeutil.QueryNodeRole, - ID: node.session.ServerID, + ID: node.session.GetServerID(), }, SystemConfigurations: metricsinfo.QueryNodeConfiguration{ SimdType: paramtable.Get().CommonCfg.SimdType.GetValue(), diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 1fcd2d0282..1d8da0b2be 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -119,7 +119,7 @@ type QueryNode struct { dispClient msgdispatcher.Client factory dependency.Factory - session *sessionutil.Session + session sessionutil.SessionInterface eventCh <-chan *sessionutil.SessionEvent cacheChunkManager storage.ChunkManager @@ -155,9 +155,9 @@ func (node *QueryNode) initSession() error { return fmt.Errorf("session is nil, the etcd client connection may have failed") } node.session.Init(typeutil.QueryNodeRole, node.address, false, true) - sessionutil.SaveServerInfo(typeutil.QueryNodeRole, node.session.ServerID) - paramtable.SetNodeID(node.session.ServerID) - log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.Address)) + sessionutil.SaveServerInfo(typeutil.QueryNodeRole, node.session.GetServerID()) + paramtable.SetNodeID(node.session.GetServerID()) + log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.GetAddress())) return nil } @@ -173,7 +173,7 @@ func (node *QueryNode) Register() error { } metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Dec() // manually send signal to starter goroutine - if node.session.TriggerKill { + if node.session.IsTriggerKill() { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) } @@ -338,7 +338,7 @@ func (node *QueryNode) Init() error { addr := "" for _, session := range sessions { - if session.ServerID == nodeID { + if session.GetServerID() == nodeID { addr = session.Address break } @@ -417,31 +417,32 @@ func (node *QueryNode) Stop() error { if err != nil { log.Warn("session fail to go stopping state", zap.Error(err)) } else { + metrics.StoppingBalanceNodeNum.WithLabelValues().Set(1) timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)) outer: for (node.manager != nil && !node.manager.Segment.Empty()) || (node.pipelineManager != nil && node.pipelineManager.Num() != 0) { + var ( + sealedSegments = []segments.Segment{} + growingSegments = []segments.Segment{} + channelNum = 0 + ) + if node.manager != nil { + sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) + growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + } + if node.pipelineManager != nil { + channelNum = node.pipelineManager.Num() + } + select { case <-timeoutCh: - var ( - sealedSegments = []segments.Segment{} - growingSegments = []segments.Segment{} - channelNum = 0 - ) - if node.manager != nil { - sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed)) - growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) - } - if node.pipelineManager != nil { - channelNum = node.pipelineManager.Num() - } - log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()), - zap.Int64s("sealedSegments", lo.Map[segments.Segment, int64](sealedSegments, func(s segments.Segment, i int) int64 { + zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { return s.ID() })), - zap.Int64s("growingSegments", lo.Map[segments.Segment, int64](growingSegments, func(t segments.Segment, i int) int64 { + zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 { return t.ID() })), zap.Int("channelNum", channelNum), @@ -449,8 +450,14 @@ func (node *QueryNode) Stop() error { break outer case <-time.After(time.Second): + metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments))) + metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum)) } } + + metrics.StoppingBalanceNodeNum.WithLabelValues().Set(0) + metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) + metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) } node.UpdateStateCode(commonpb.StateCode_Abnormal) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 1e59d3274e..ec98b0159c 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -108,7 +109,6 @@ func (suite *QueryNodeSuite) TestBasic() { suite.True(suite.node.lifetime.GetState() == commonpb.StateCode_Healthy) // register node to etcd - suite.node.session.TriggerKill = false err = suite.node.Register() suite.NoError(err) @@ -217,6 +217,10 @@ func (suite *QueryNodeSuite) TestStop() { paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "2") suite.node.manager = segments.NewManager() + mockSession := sessionutil.NewMockSession(suite.T()) + mockSession.EXPECT().GoingStop().Return(nil) + mockSession.EXPECT().Stop() + suite.node.session = mockSession schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64) collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection) diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index ec645400bd..891e57f4fc 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -149,12 +149,12 @@ func (suite *ServiceSuite) TearDownTest() { resp, err := suite.node.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseSegments, - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, SegmentIDs: suite.validSegmentIDs, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), Scope: querypb.DataScope_All, Shard: suite.vchannel, }) @@ -206,7 +206,7 @@ func (suite *ServiceSuite) TestGetStatistics_Normal() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: []int64{}, @@ -231,7 +231,7 @@ func (suite *ServiceSuite) TestGetStatistics_Failed() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: []int64{}, @@ -261,9 +261,9 @@ func (suite *ServiceSuite) TestWatchDmChannelsInt64() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, Infos: []*datapb.VchannelInfo{ @@ -313,9 +313,9 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, Infos: []*datapb.VchannelInfo{ @@ -365,9 +365,9 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, Infos: []*datapb.VchannelInfo{ @@ -443,9 +443,9 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_UnsubDmChannel, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), CollectionID: suite.collectionID, ChannelName: suite.vchannel, } @@ -465,9 +465,9 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_UnsubDmChannel, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, - NodeID: suite.node.session.ServerID, + NodeID: suite.node.session.GetServerID(), CollectionID: suite.collectionID, ChannelName: suite.vchannel, } @@ -557,10 +557,10 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: []*querypb.SegmentLoadInfo{info}, Schema: schema, DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}}, @@ -595,10 +595,10 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: []*querypb.SegmentLoadInfo{info}, Schema: schema, DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}}, @@ -622,10 +622,10 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -647,10 +647,10 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -685,10 +685,10 @@ func (suite *ServiceSuite) TestLoadIndex_Success() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: rawInfo, Schema: schema, NeedTransfer: false, @@ -710,10 +710,10 @@ func (suite *ServiceSuite) TestLoadIndex_Success() { req = &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: infos, Schema: schema, NeedTransfer: false, @@ -755,10 +755,10 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: rawInfo, Schema: schema, NeedTransfer: false, @@ -788,10 +788,10 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: infos, Schema: schema, NeedTransfer: false, @@ -813,10 +813,10 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -865,10 +865,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -887,10 +887,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -914,10 +914,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() { req := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, - DstNodeID: suite.node.session.ServerID, + DstNodeID: suite.node.session.GetServerID(), Infos: suite.genSegmentLoadInfos(schema), Schema: schema, NeedTransfer: true, @@ -985,7 +985,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Normal() { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, SegmentIDs: suite.validSegmentIDs, @@ -1003,7 +1003,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Failed() { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, SegmentIDs: suite.validSegmentIDs, @@ -1029,7 +1029,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Shard: suite.vchannel, CollectionID: suite.collectionID, @@ -1051,7 +1051,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Shard: suite.vchannel, CollectionID: suite.collectionID, @@ -1078,7 +1078,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Shard: suite.vchannel, CollectionID: suite.collectionID, @@ -1141,7 +1141,7 @@ func (suite *ServiceSuite) genCSearchRequest(nq int64, indexType string, schema Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Search, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, @@ -1343,7 +1343,7 @@ func (suite *ServiceSuite) genCQueryRequest(nq int64, indexType string, schema * Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Retrieve, MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, @@ -1652,7 +1652,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Normal() { req := &internalpb.ShowConfigurationsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Pattern: "Cache.enabled", } @@ -1668,7 +1668,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Failed() { req := &internalpb.ShowConfigurationsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Pattern: "Cache.enabled", } @@ -1690,7 +1690,7 @@ func (suite *ServiceSuite) TestGetMetric_Normal() { req := &milvuspb.GetMetricsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Request: string(mReq), } @@ -1711,7 +1711,7 @@ func (suite *ServiceSuite) TestGetMetric_Failed() { req := &milvuspb.GetMetricsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, Request: string(mReq), } @@ -1742,7 +1742,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Normal() { req := &querypb.GetDataDistributionRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, } @@ -1756,7 +1756,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Failed() { req := &querypb.GetDataDistributionRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, } @@ -1784,7 +1784,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() { req := &querypb.SyncDistributionRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, Channel: suite.vchannel, @@ -1869,7 +1869,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { req := &querypb.SyncDistributionRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, Channel: suite.vchannel, @@ -1915,7 +1915,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Failed() { req := &querypb.SyncDistributionRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, Channel: suite.vchannel, @@ -1943,7 +1943,7 @@ func (suite *ServiceSuite) TestDelete_Int64() { req := &querypb.DeleteRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionId: suite.collectionID, PartitionId: suite.partitionIDs[0], @@ -1974,7 +1974,7 @@ func (suite *ServiceSuite) TestDelete_VarChar() { req := &querypb.DeleteRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionId: suite.collectionID, PartitionId: suite.partitionIDs[0], @@ -2005,7 +2005,7 @@ func (suite *ServiceSuite) TestDelete_Failed() { req := &querypb.DeleteRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionId: suite.collectionID, PartitionId: suite.partitionIDs[0], @@ -2041,7 +2041,7 @@ func (suite *ServiceSuite) TestLoadPartition() { req := &querypb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ MsgID: rand.Int63(), - TargetID: suite.node.session.ServerID, + TargetID: suite.node.session.GetServerID(), }, CollectionID: suite.collectionID, PartitionIDs: suite.partitionIDs, diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 86bd2f7953..f49e8c6a15 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -442,6 +442,30 @@ var ( }, []string{ nodeIDLabelName, }) + + StoppingBalanceNodeNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "stopping_balance_node_num", + Help: "the number of node which executing stopping balance", + }, []string{}) + + StoppingBalanceChannelNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "stopping_balance_channel_num", + Help: "the number of channel which executing stopping balance", + }, []string{nodeIDLabelName}) + + StoppingBalanceSegmentNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "stopping_balance_segment_num", + Help: "the number of segment which executing stopping balance", + }, []string{nodeIDLabelName}) ) // RegisterQueryNode registers QueryNode metrics @@ -483,6 +507,9 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeDiskUsedSize) registry.MustRegister(QueryNodeProcessCost) registry.MustRegister(QueryNodeWaitProcessingMsgCount) + registry.MustRegister(StoppingBalanceNodeNum) + registry.MustRegister(StoppingBalanceChannelNum) + registry.MustRegister(StoppingBalanceSegmentNum) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {