enhance: add metrics for stopping querynode balance progress (#29201) (#29390)

pr: #29201
This PR add three metrics to track the stopping balance progress.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-12-26 10:02:46 +08:00 committed by GitHub
parent 477def9368
commit 514da535e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 82 deletions

View File

@ -167,7 +167,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
}, nil
}
hardwareInfos := metricsinfo.HardwareMetrics{
IP: node.session.Address,
IP: node.session.GetAddress(),
CPUCoreCount: hardware.GetCPUNum(),
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
@ -185,7 +185,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
CreatedTime: paramtable.GetCreateTime().String(),
UpdatedTime: paramtable.GetUpdateTime().String(),
Type: typeutil.QueryNodeRole,
ID: node.session.ServerID,
ID: node.session.GetServerID(),
},
SystemConfigurations: metricsinfo.QueryNodeConfiguration{
SimdType: paramtable.Get().CommonCfg.SimdType.GetValue(),

View File

@ -119,7 +119,7 @@ type QueryNode struct {
dispClient msgdispatcher.Client
factory dependency.Factory
session *sessionutil.Session
session sessionutil.SessionInterface
eventCh <-chan *sessionutil.SessionEvent
cacheChunkManager storage.ChunkManager
@ -155,9 +155,9 @@ func (node *QueryNode) initSession() error {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
node.session.Init(typeutil.QueryNodeRole, node.address, false, true)
sessionutil.SaveServerInfo(typeutil.QueryNodeRole, node.session.ServerID)
paramtable.SetNodeID(node.session.ServerID)
log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.Address))
sessionutil.SaveServerInfo(typeutil.QueryNodeRole, node.session.GetServerID())
paramtable.SetNodeID(node.session.GetServerID())
log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.GetAddress()))
return nil
}
@ -173,7 +173,7 @@ func (node *QueryNode) Register() error {
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Dec()
// manually send signal to starter goroutine
if node.session.TriggerKill {
if node.session.IsTriggerKill() {
if p, err := os.FindProcess(os.Getpid()); err == nil {
p.Signal(syscall.SIGINT)
}
@ -338,7 +338,7 @@ func (node *QueryNode) Init() error {
addr := ""
for _, session := range sessions {
if session.ServerID == nodeID {
if session.GetServerID() == nodeID {
addr = session.Address
break
}
@ -417,31 +417,32 @@ func (node *QueryNode) Stop() error {
if err != nil {
log.Warn("session fail to go stopping state", zap.Error(err))
} else {
metrics.StoppingBalanceNodeNum.WithLabelValues().Set(1)
timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second))
outer:
for (node.manager != nil && !node.manager.Segment.Empty()) ||
(node.pipelineManager != nil && node.pipelineManager.Num() != 0) {
var (
sealedSegments = []segments.Segment{}
growingSegments = []segments.Segment{}
channelNum = 0
)
if node.manager != nil {
sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
}
if node.pipelineManager != nil {
channelNum = node.pipelineManager.Num()
}
select {
case <-timeoutCh:
var (
sealedSegments = []segments.Segment{}
growingSegments = []segments.Segment{}
channelNum = 0
)
if node.manager != nil {
sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
}
if node.pipelineManager != nil {
channelNum = node.pipelineManager.Num()
}
log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()),
zap.Int64s("sealedSegments", lo.Map[segments.Segment, int64](sealedSegments, func(s segments.Segment, i int) int64 {
zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 {
return s.ID()
})),
zap.Int64s("growingSegments", lo.Map[segments.Segment, int64](growingSegments, func(t segments.Segment, i int) int64 {
zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 {
return t.ID()
})),
zap.Int("channelNum", channelNum),
@ -449,8 +450,14 @@ func (node *QueryNode) Stop() error {
break outer
case <-time.After(time.Second):
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments)))
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum))
}
}
metrics.StoppingBalanceNodeNum.WithLabelValues().Set(0)
metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0)
metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0)
}
node.UpdateStateCode(commonpb.StateCode_Abnormal)

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -108,7 +109,6 @@ func (suite *QueryNodeSuite) TestBasic() {
suite.True(suite.node.lifetime.GetState() == commonpb.StateCode_Healthy)
// register node to etcd
suite.node.session.TriggerKill = false
err = suite.node.Register()
suite.NoError(err)
@ -217,6 +217,10 @@ func (suite *QueryNodeSuite) TestStop() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "2")
suite.node.manager = segments.NewManager()
mockSession := sessionutil.NewMockSession(suite.T())
mockSession.EXPECT().GoingStop().Return(nil)
mockSession.EXPECT().Stop()
suite.node.session = mockSession
schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64)
collection := segments.NewCollection(1, schema, nil, querypb.LoadType_LoadCollection)

View File

@ -149,12 +149,12 @@ func (suite *ServiceSuite) TearDownTest() {
resp, err := suite.node.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseSegments,
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
SegmentIDs: suite.validSegmentIDs,
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
Scope: querypb.DataScope_All,
Shard: suite.vchannel,
})
@ -206,7 +206,7 @@ func (suite *ServiceSuite) TestGetStatistics_Normal() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: []int64{},
@ -231,7 +231,7 @@ func (suite *ServiceSuite) TestGetStatistics_Failed() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: []int64{},
@ -261,9 +261,9 @@ func (suite *ServiceSuite) TestWatchDmChannelsInt64() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
Infos: []*datapb.VchannelInfo{
@ -313,9 +313,9 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
Infos: []*datapb.VchannelInfo{
@ -365,9 +365,9 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
Infos: []*datapb.VchannelInfo{
@ -443,9 +443,9 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_UnsubDmChannel,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
CollectionID: suite.collectionID,
ChannelName: suite.vchannel,
}
@ -465,9 +465,9 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_UnsubDmChannel,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
NodeID: suite.node.session.ServerID,
NodeID: suite.node.session.GetServerID(),
CollectionID: suite.collectionID,
ChannelName: suite.vchannel,
}
@ -557,10 +557,10 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: []*querypb.SegmentLoadInfo{info},
Schema: schema,
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
@ -595,10 +595,10 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: []*querypb.SegmentLoadInfo{info},
Schema: schema,
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
@ -622,10 +622,10 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -647,10 +647,10 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -685,10 +685,10 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: rawInfo,
Schema: schema,
NeedTransfer: false,
@ -710,10 +710,10 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
req = &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: infos,
Schema: schema,
NeedTransfer: false,
@ -755,10 +755,10 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: rawInfo,
Schema: schema,
NeedTransfer: false,
@ -788,10 +788,10 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: infos,
Schema: schema,
NeedTransfer: false,
@ -813,10 +813,10 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -865,10 +865,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -887,10 +887,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -914,10 +914,10 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
DstNodeID: suite.node.session.GetServerID(),
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
@ -985,7 +985,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Normal() {
req := &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
SegmentIDs: suite.validSegmentIDs,
@ -1003,7 +1003,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Failed() {
req := &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
SegmentIDs: suite.validSegmentIDs,
@ -1029,7 +1029,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() {
req := &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Shard: suite.vchannel,
CollectionID: suite.collectionID,
@ -1051,7 +1051,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() {
req := &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Shard: suite.vchannel,
CollectionID: suite.collectionID,
@ -1078,7 +1078,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Transfer() {
req := &querypb.ReleaseSegmentsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Shard: suite.vchannel,
CollectionID: suite.collectionID,
@ -1141,7 +1141,7 @@ func (suite *ServiceSuite) genCSearchRequest(nq int64, indexType string, schema
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
@ -1343,7 +1343,7 @@ func (suite *ServiceSuite) genCQueryRequest(nq int64, indexType string, schema *
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Retrieve,
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
@ -1652,7 +1652,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Normal() {
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Pattern: "Cache.enabled",
}
@ -1668,7 +1668,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Failed() {
req := &internalpb.ShowConfigurationsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Pattern: "Cache.enabled",
}
@ -1690,7 +1690,7 @@ func (suite *ServiceSuite) TestGetMetric_Normal() {
req := &milvuspb.GetMetricsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Request: string(mReq),
}
@ -1711,7 +1711,7 @@ func (suite *ServiceSuite) TestGetMetric_Failed() {
req := &milvuspb.GetMetricsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
Request: string(mReq),
}
@ -1742,7 +1742,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Normal() {
req := &querypb.GetDataDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
}
@ -1756,7 +1756,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Failed() {
req := &querypb.GetDataDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
}
@ -1784,7 +1784,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Normal() {
req := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
Channel: suite.vchannel,
@ -1869,7 +1869,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() {
req := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
Channel: suite.vchannel,
@ -1915,7 +1915,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Failed() {
req := &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
Channel: suite.vchannel,
@ -1943,7 +1943,7 @@ func (suite *ServiceSuite) TestDelete_Int64() {
req := &querypb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionId: suite.collectionID,
PartitionId: suite.partitionIDs[0],
@ -1974,7 +1974,7 @@ func (suite *ServiceSuite) TestDelete_VarChar() {
req := &querypb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionId: suite.collectionID,
PartitionId: suite.partitionIDs[0],
@ -2005,7 +2005,7 @@ func (suite *ServiceSuite) TestDelete_Failed() {
req := &querypb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionId: suite.collectionID,
PartitionId: suite.partitionIDs[0],
@ -2041,7 +2041,7 @@ func (suite *ServiceSuite) TestLoadPartition() {
req := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
TargetID: suite.node.session.GetServerID(),
},
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,

View File

@ -442,6 +442,30 @@ var (
}, []string{
nodeIDLabelName,
})
StoppingBalanceNodeNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_node_num",
Help: "the number of node which executing stopping balance",
}, []string{})
StoppingBalanceChannelNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_channel_num",
Help: "the number of channel which executing stopping balance",
}, []string{nodeIDLabelName})
StoppingBalanceSegmentNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "stopping_balance_segment_num",
Help: "the number of segment which executing stopping balance",
}, []string{nodeIDLabelName})
)
// RegisterQueryNode registers QueryNode metrics
@ -483,6 +507,9 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeDiskUsedSize)
registry.MustRegister(QueryNodeProcessCost)
registry.MustRegister(QueryNodeWaitProcessingMsgCount)
registry.MustRegister(StoppingBalanceNodeNum)
registry.MustRegister(StoppingBalanceChannelNum)
registry.MustRegister(StoppingBalanceSegmentNum)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {