From 833c74aa66edf99636ddee29f3106bebcad02c61 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 13 Dec 2024 14:14:50 +0800 Subject: [PATCH] enhance: add detail, replica count for resource group (#38314) issue: #30647 --------- Signed-off-by: chyezh --- internal/querycoordv2/meta/replica_manager.go | 17 +++++++- .../querycoordv2/meta/resource_manager.go | 40 ++++++++++++++++--- pkg/metrics/metrics.go | 3 +- pkg/metrics/querycoord_metrics.go | 27 +++++++++++++ pkg/metrics/querynode_metrics.go | 22 +++++----- 5 files changed, 90 insertions(+), 19 deletions(-) diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 7fcc7ea5df..2c04c52c7c 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -187,8 +188,14 @@ func (m *ReplicaManager) put(ctx context.Context, replicas ...*Replica) error { // putReplicaInMemory puts replicas into in-memory map and collIDToReplicaIDs. func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) { for _, replica := range replicas { + if oldReplica, ok := m.replicas[replica.GetID()]; ok { + metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(oldReplica.GetResourceGroup()).Dec() + metrics.QueryCoordReplicaRONodeTotal.Add(-float64(oldReplica.RONodesCount())) + } // update in-memory replicas. m.replicas[replica.GetID()] = replica + metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Inc() + metrics.QueryCoordReplicaRONodeTotal.Add(float64(replica.RONodesCount())) // update collIDToReplicaIDs. if m.coll2Replicas[replica.GetCollectionID()] == nil { @@ -280,6 +287,8 @@ func (m *ReplicaManager) RemoveCollection(ctx context.Context, collectionID type if collReplicas, ok := m.coll2Replicas[collectionID]; ok { // Remove all replica of collection and remove collection from collIDToReplicaIDs. for _, replica := range collReplicas.replicas { + metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Dec() + metrics.QueryCoordReplicaRONodeTotal.Add(-float64(replica.RONodesCount())) delete(m.replicas, replica.GetID()) } delete(m.coll2Replicas, collectionID) @@ -302,8 +311,12 @@ func (m *ReplicaManager) removeReplicas(ctx context.Context, collectionID typeut return err } - for _, replica := range replicas { - delete(m.replicas, replica) + for _, replicaID := range replicas { + if replica, ok := m.replicas[replicaID]; ok { + metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Dec() + metrics.QueryCoordReplicaRONodeTotal.Add(float64(-replica.RONodesCount())) + delete(m.replicas, replicaID) + } } if m.coll2Replicas[collectionID].removeReplicas(replicas...) { diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 1f7836b750..f0696dceca 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -19,9 +19,11 @@ package meta import ( "context" "fmt" + "strconv" "sync" "github.com/cockroachdb/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -32,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -93,7 +96,7 @@ func (rm *ResourceManager) Recover(ctx context.Context) error { needUpgrade := meta.Config == nil rg := NewResourceGroupFromMeta(meta, rm.nodeMgr) - rm.groups[rg.GetName()] = rg + rm.setupInMemResourceGroup(rg) for _, node := range rg.GetNodes() { if _, ok := rm.nodeIDMap[node]; ok { // unreachable code, should never happen. @@ -158,7 +161,7 @@ func (rm *ResourceManager) AddResourceGroup(ctx context.Context, rgName string, return merr.WrapErrResourceGroupServiceAvailable() } - rm.groups[rgName] = rg + rm.setupInMemResourceGroup(rg) log.Info("add resource group", zap.String("rgName", rgName), zap.Any("config", cfg), @@ -218,7 +221,7 @@ func (rm *ResourceManager) updateResourceGroups(ctx context.Context, rgs map[str zap.String("rgName", rg.GetName()), zap.Any("config", rg.GetConfig()), ) - rm.groups[rg.GetName()] = rg + rm.setupInMemResourceGroup(rg) } // notify that resource group config has been changed. @@ -318,6 +321,12 @@ func (rm *ResourceManager) RemoveResourceGroup(ctx context.Context, rgName strin // After recovering, all node assigned to these rg has been removed. // no secondary index need to be removed. delete(rm.groups, rgName) + metrics.QueryCoordResourceGroupInfo.DeletePartialMatch(prometheus.Labels{ + metrics.ResourceGroupLabelName: rgName, + }) + metrics.QueryCoordResourceGroupReplicaTotal.DeletePartialMatch(prometheus.Labels{ + metrics.ResourceGroupLabelName: rgName, + }) log.Info("remove resource group", zap.String("rgName", rgName), @@ -840,7 +849,7 @@ func (rm *ResourceManager) transferNode(ctx context.Context, rgName string, node // Commit updates to memory. for _, rg := range modifiedRG { - rm.groups[rg.GetName()] = rg + rm.setupInMemResourceGroup(rg) } rm.nodeIDMap[node] = rgName log.Info("transfer node to resource group", @@ -870,7 +879,7 @@ func (rm *ResourceManager) unassignNode(ctx context.Context, node int64) (string } // Commit updates to memory. - rm.groups[rg.GetName()] = rg + rm.setupInMemResourceGroup(rg) delete(rm.nodeIDMap, node) log.Info("unassign node to resource group", zap.String("rgName", rg.GetName()), @@ -945,6 +954,27 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error return nil } +// setupInMemResourceGroup setup resource group in memory. +func (rm *ResourceManager) setupInMemResourceGroup(r *ResourceGroup) { + // clear old metrics. + if oldR, ok := rm.groups[r.GetName()]; ok { + for _, nodeID := range oldR.GetNodes() { + metrics.QueryCoordResourceGroupInfo.DeletePartialMatch(prometheus.Labels{ + metrics.ResourceGroupLabelName: r.GetName(), + metrics.NodeIDLabelName: strconv.FormatInt(nodeID, 10), + }) + } + } + // add new metrics. + for _, nodeID := range r.GetNodes() { + metrics.QueryCoordResourceGroupInfo.WithLabelValues( + r.GetName(), + strconv.FormatInt(nodeID, 10), + ).Set(1) + } + rm.groups[r.GetName()] = r +} + func (rm *ResourceManager) GetResourceGroupsJSON(ctx context.Context) string { rm.rwmutex.RLock() defer rm.rwmutex.RUnlock() diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 0c1a1e6da4..7c7296c881 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -88,6 +88,7 @@ const ( segmentPruneLabelName = "segment_prune_label" stageLabelName = "compaction_stage" nodeIDLabelName = "node_id" + nodeHostLabelName = "node_host" statusLabelName = "status" indexTaskStatusLabelName = "index_task_status" msgTypeLabelName = "msg_type" @@ -98,7 +99,7 @@ const ( queryTypeLabelName = "query_type" collectionName = "collection_name" databaseLabelName = "db_name" - resourceGroupLabelName = "rg" + ResourceGroupLabelName = "rg" indexName = "index_name" isVectorIndex = "is_vector_index" segmentStateLabelName = "segment_state" diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index b8a1301a09..67aa181421 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -132,6 +132,30 @@ var ( Help: "latency of all kind of task in query coord scheduler scheduler", Buckets: longTaskBuckets, }, []string{collectionIDLabelName, taskTypeLabel, channelNameLabelName}) + + QueryCoordResourceGroupInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "resource_group_info", + Help: "all resource group detail info in query coord", + }, []string{ResourceGroupLabelName, NodeIDLabelName}) + + QueryCoordResourceGroupReplicaTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "resource_group_replica_total", + Help: "total replica number of resource group", + }, []string{ResourceGroupLabelName}) + + QueryCoordReplicaRONodeTotal = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "replica_ro_node_total", + Help: "total read only node number of replica", + }) ) // RegisterQueryCoord registers QueryCoord metrics @@ -146,6 +170,9 @@ func RegisterQueryCoord(registry *prometheus.Registry) { registry.MustRegister(QueryCoordNumQueryNodes) registry.MustRegister(QueryCoordCurrentTargetCheckpointUnixSeconds) registry.MustRegister(QueryCoordTaskLatency) + registry.MustRegister(QueryCoordResourceGroupInfo) + registry.MustRegister(QueryCoordResourceGroupReplicaTotal) + registry.MustRegister(QueryCoordReplicaRONodeTotal) } func CleanQueryCoordMetricsWithCollectionID(collectionID int64) { diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index a80a4bbc21..13f723b576 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -201,7 +201,7 @@ var ( nodeIDLabelName, queryTypeLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) QueryNodeSQPerUserLatencyInQueue = prometheus.NewHistogramVec( @@ -602,7 +602,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, queryTypeLabelName, }, ) @@ -617,7 +617,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, queryTypeLabelName, }, ) @@ -646,7 +646,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, queryTypeLabelName, }) @@ -660,7 +660,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, queryTypeLabelName, }) @@ -687,7 +687,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheLoadBytes records the number of bytes loaded from disk cache. @@ -700,7 +700,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheLoadDuration records the total time cost of loading segments from disk cache. @@ -714,7 +714,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheLoadGlobalDuration records the global time cost of loading segments from disk cache. @@ -739,7 +739,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheEvictBytes records the number of bytes evicted from disk cache. @@ -752,7 +752,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheEvictDuration records the total time cost of evicting segments from disk cache. @@ -765,7 +765,7 @@ var ( }, []string{ nodeIDLabelName, databaseLabelName, - resourceGroupLabelName, + ResourceGroupLabelName, }) // QueryNodeDiskCacheEvictGlobalDuration records the global time cost of evicting segments from disk cache.