diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 1ce0bcfeb8..a48ac08cb0 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -84,6 +84,7 @@ type collectionInfo struct { StartPositions []*commonpb.KeyDataPair Properties map[string]string CreatedAt Timestamp + DatabaseName string } // NewMeta creates meta from provided `kv.TxnKV` @@ -198,6 +199,7 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo { Partitions: coll.Partitions, StartPositions: common.CloneKeyDataPairs(coll.StartPositions), Properties: clonedProperties, + DatabaseName: coll.DatabaseName, } return cloneColl @@ -267,17 +269,29 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) { if isSegmentHealthy(segment) && !segment.GetIsImporting() { total += segmentSize collectionBinlogSize[segment.GetCollectionID()] += segmentSize - metrics.DataCoordStoredBinlogSize.WithLabelValues( - fmt.Sprint(segment.GetCollectionID()), fmt.Sprint(segment.GetID())).Set(float64(segmentSize)) + + coll, ok := m.collections[segment.GetCollectionID()] + if ok { + metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName, + fmt.Sprint(segment.GetCollectionID()), fmt.Sprint(segment.GetID())).Set(float64(segmentSize)) + } else { + log.Warn("not found database name", zap.Int64("collectionID", segment.GetCollectionID())) + } + if _, ok := collectionRowsNum[segment.GetCollectionID()]; !ok { collectionRowsNum[segment.GetCollectionID()] = make(map[commonpb.SegmentState]int64) } collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows() } } - for collection, statesRows := range collectionRowsNum { + for collectionID, statesRows := range collectionRowsNum { for state, rows := range statesRows { - metrics.DataCoordNumStoredRows.WithLabelValues(fmt.Sprint(collection), state.String()).Set(float64(rows)) + coll, ok := m.collections[collectionID] + if ok { + metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), state.String()).Set(float64(rows)) + } else { + log.Warn("not found database name", zap.Int64("collectionID", collectionID)) + } } } return total, collectionBinlogSize diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index d0e134aa97..3fad49ec36 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" @@ -400,6 +401,7 @@ func TestMeta_Basic(t *testing.T) { Schema: testSchema, Partitions: []UniqueID{partID0, partID1}, StartPositions: []*commonpb.KeyDataPair{}, + DatabaseName: util.DefaultDBName, } collInfoWoPartition := &collectionInfo{ ID: collID, @@ -605,6 +607,12 @@ func TestMeta_Basic(t *testing.T) { assert.Len(t, collectionBinlogSize, 1) assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) assert.Equal(t, int64(size0+size1), total) + + meta.collections[collID] = collInfo + total, collectionBinlogSize = meta.GetCollectionBinlogSize() + assert.Len(t, collectionBinlogSize, 1) + assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) + assert.Equal(t, int64(size0+size1), total) }) t.Run("Test AddAllocation", func(t *testing.T) { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 38b896aafa..c26c3c3c9e 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1201,6 +1201,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i StartPositions: resp.GetStartPositions(), Properties: properties, CreatedAt: resp.GetCreatedTimestamp(), + DatabaseName: resp.GetDbName(), } s.meta.AddCollection(collInfo) return nil diff --git a/internal/proxy/hook_interceptor.go b/internal/proxy/hook_interceptor.go index 008c6c1ea9..6a9aa885b8 100644 --- a/internal/proxy/hook_interceptor.go +++ b/internal/proxy/hook_interceptor.go @@ -133,8 +133,8 @@ func updateProxyFunctionCallMetric(fullMethod string) { if method == "" { return } - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, "", "").Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", "").Inc() } func getCurrentUser(ctx context.Context) string { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a5e0c13749..8c894b0092 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -154,9 +154,9 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p // no need to handle error, since this Proxy may not create dml stream for the collection. node.chMgr.removeDMLStream(request.GetCollectionID()) // clean up collection level metrics - metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName) + metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), collectionName) for _, alias := range aliasName { - metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias) + metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), alias) } } log.Info("complete to invalidate collection meta cache") @@ -179,6 +179,8 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + "", ).Inc() cct := &createDatabaseTask{ @@ -199,7 +201,9 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD if err := node.sched.ddQueue.Enqueue(cct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -207,7 +211,9 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD if err := cct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -216,6 +222,8 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -240,6 +248,8 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + "", ).Inc() dct := &dropDatabaseTask{ @@ -259,14 +269,18 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab log.Info(rpcReceived(method)) if err := node.sched.ddQueue.Enqueue(dct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } log.Info(rpcEnqueued(method)) if err := dct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -275,6 +289,8 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -282,6 +298,7 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab method, ).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName()) return dct.result, nil } @@ -301,6 +318,8 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + "", + "", ).Inc() dct := &listDatabaseTask{ @@ -319,7 +338,9 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData if err := node.sched.ddQueue.Enqueue(dct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, "", ""). + Inc() resp.Status = merr.Status(err) return resp, nil } @@ -327,7 +348,9 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData log.Info(rpcEnqueued(method)) if err := dct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", ""). + Inc() resp.Status = merr.Status(err) return resp, nil } @@ -337,6 +360,8 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + "", + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -363,6 +388,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() cct := &createCollectionTask{ @@ -391,7 +418,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -409,7 +436,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.Uint64("BeginTs", cct.BeginTs()), zap.Uint64("EndTs", cct.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -423,6 +450,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -446,6 +475,8 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() dct := &dropCollectionTask{ @@ -469,7 +500,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol log.Warn("DropCollection failed to enqueue", zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -485,7 +516,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol zap.Uint64("BeginTs", dct.BeginTs()), zap.Uint64("EndTs", dct.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -499,6 +530,8 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -524,6 +557,8 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() log := log.Ctx(ctx).With( @@ -546,7 +581,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil @@ -565,7 +600,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.Uint64("EndTS", hct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil @@ -581,6 +616,8 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -604,6 +641,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() lct := &loadCollectionTask{ @@ -629,7 +668,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -645,7 +684,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.Uint64("BeginTS", lct.BeginTs()), zap.Uint64("EndTS", lct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -659,6 +698,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -679,7 +720,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele method := "ReleaseCollection" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() rct := &releaseCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -701,7 +742,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -718,7 +759,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -728,7 +769,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rct.result, nil } @@ -746,7 +787,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des method := "DescribeCollection" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dct := &describeCollectionTask{ ctx: ctx, @@ -767,7 +808,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil @@ -784,7 +825,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.Uint64("EndTS", dct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), @@ -799,7 +840,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des ) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dct.result, nil } @@ -818,7 +859,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati method := "GetStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getStatisticsTask{ request: request, Condition: NewTaskCondition(ctx), @@ -845,7 +886,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Strings("partitions", request.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetStatisticsResponse{ Status: merr.Status(err), @@ -867,7 +908,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Strings("partitions", request.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetStatisticsResponse{ Status: merr.Status(err), @@ -880,7 +921,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -898,7 +939,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp method := "GetCollectionStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getCollectionStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -919,7 +960,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetCollectionStatisticsResponse{ Status: merr.Status(err), @@ -939,7 +980,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetCollectionStatisticsResponse{ Status: merr.Status(err), @@ -952,7 +993,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -968,7 +1009,9 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo defer sp.End() method := "ShowCollections" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), ""). + Inc() sct := &showCollectionsTask{ ctx: ctx, @@ -993,7 +1036,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Error(err), zap.Any("CollectionNames", request.CollectionNames)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), }, nil @@ -1008,7 +1051,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Error(err), zap.Any("CollectionNames", request.CollectionNames)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), @@ -1019,7 +1062,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Int("len(CollectionNames)", len(request.CollectionNames)), zap.Int("num_collections", len(sct.result.CollectionNames))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return sct.result, nil } @@ -1034,7 +1077,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC method := "AlterCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() act := &alterCollectionTask{ ctx: ctx, @@ -1057,7 +1100,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1074,7 +1117,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.Uint64("BeginTs", act.BeginTs()), zap.Uint64("EndTs", act.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1083,7 +1126,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.Uint64("BeginTs", act.BeginTs()), zap.Uint64("EndTs", act.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return act.result, nil } @@ -1098,7 +1141,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create defer sp.End() method := "CreatePartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() cpt := &createPartitionTask{ ctx: ctx, @@ -1121,7 +1164,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1138,7 +1181,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.Uint64("BeginTS", cpt.BeginTs()), zap.Uint64("EndTS", cpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1148,7 +1191,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.Uint64("BeginTS", cpt.BeginTs()), zap.Uint64("EndTS", cpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cpt.result, nil } @@ -1163,7 +1206,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart defer sp.End() method := "DropPartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dpt := &dropPartitionTask{ ctx: ctx, @@ -1187,7 +1230,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1204,7 +1247,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.Uint64("BeginTS", dpt.BeginTs()), zap.Uint64("EndTS", dpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1214,7 +1257,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.Uint64("BeginTS", dpt.BeginTs()), zap.Uint64("EndTS", dpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dpt.result, nil } @@ -1233,7 +1276,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit tr := timerecord.NewTimeRecorder(method) // TODO: use collectionID instead of collectionName metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() hpt := &hasPartitionTask{ ctx: ctx, @@ -1257,7 +1300,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), @@ -1278,7 +1321,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Uint64("EndTS", hpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), @@ -1292,7 +1335,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Uint64("EndTS", hpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return hpt.result, nil } @@ -1308,7 +1351,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar method := "LoadPartitions" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() lpt := &loadPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -1333,7 +1376,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1351,7 +1394,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Uint64("EndTS", lpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1362,7 +1405,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Uint64("EndTS", lpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lpt.result, nil } @@ -1387,7 +1430,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele method := "ReleasePartitions" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1403,7 +1446,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1421,7 +1464,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1432,7 +1475,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rpt.result, nil } @@ -1450,7 +1493,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb method := "GetPartitionStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getPartitionStatisticsTask{ ctx: ctx, @@ -1473,7 +1516,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetPartitionStatisticsResponse{ Status: merr.Status(err), @@ -1493,7 +1536,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetPartitionStatisticsResponse{ Status: merr.Status(err), @@ -1506,7 +1549,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -1535,7 +1578,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar tr := timerecord.NewTimeRecorder(method) // TODO: use collectionID instead of collectionName metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole)) @@ -1550,7 +1593,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("request", request)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), @@ -1576,7 +1619,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), @@ -1592,7 +1635,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return spt.result, nil } @@ -1605,7 +1648,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get tr := timerecord.NewTimeRecorder(method) ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadingProgress") defer sp.End() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx) log.Debug( @@ -1617,7 +1660,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get zap.String("collectionName", request.CollectionName), zap.Strings("partitionName", request.PartitionNames), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) { return &milvuspb.GetLoadingProgressResponse{ Status: merr.Status(err), @@ -1665,7 +1708,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get log.Debug( rpcDone(method), zap.Any("request", request)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.GetLoadingProgressResponse{ Status: merr.Success(), @@ -1682,7 +1725,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt tr := timerecord.NewTimeRecorder(method) ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadState") defer sp.End() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx) log.Debug( @@ -1694,7 +1737,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt zap.String("collection_name", request.CollectionName), zap.Strings("partition_name", request.PartitionNames), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetLoadStateResponse{ Status: merr.Status(err), } @@ -1711,7 +1754,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt log.Debug( rpcDone(method), zap.Any("request", request)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) }() @@ -1791,7 +1834,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde method := "CreateIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1808,7 +1851,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1826,7 +1869,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Uint64("EndTs", cit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1837,7 +1880,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Uint64("EndTs", cit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cit.result, nil } @@ -1862,7 +1905,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR method := "AlterIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1879,7 +1922,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1897,7 +1940,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Uint64("EndTs", task.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1908,7 +1951,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Uint64("EndTs", task.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return task.result, nil } @@ -1935,7 +1978,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe // avoid data race tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1952,7 +1995,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeIndexResponse{ Status: merr.Status(err), @@ -1972,7 +2015,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeIndexResponse{ Status: merr.Status(err), @@ -1985,7 +2028,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2012,7 +2055,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get // avoid data race tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2028,7 +2071,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStatisticsResponse{ Status: merr.Status(err), @@ -2042,7 +2085,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get if err := dit.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err), zap.Uint64("BeginTs", dit.BeginTs()), zap.Uint64("EndTs", dit.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStatisticsResponse{ Status: merr.Status(err), }, nil @@ -2054,7 +2097,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil @@ -2081,7 +2124,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq method := "DropIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2097,7 +2140,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq rpcFailedToEnqueue(method), zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -2115,7 +2158,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -2126,7 +2169,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2155,7 +2198,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. method := "GetIndexBuildProgress" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2171,7 +2214,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. rpcFailedToEnqueue(method), zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexBuildProgressResponse{ Status: merr.Status(err), @@ -2190,7 +2233,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.Uint64("BeginTs", gibpt.BeginTs()), zap.Uint64("EndTs", gibpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexBuildProgressResponse{ Status: merr.Status(err), @@ -2203,7 +2246,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.Uint64("EndTs", gibpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return gibpt.result, nil } @@ -2231,7 +2274,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex method := "GetIndexState" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2248,7 +2291,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStateResponse{ Status: merr.Status(err), @@ -2267,7 +2310,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Uint64("BeginTs", dipt.BeginTs()), zap.Uint64("EndTs", dipt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStateResponse{ Status: merr.Status(err), @@ -2280,7 +2323,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Uint64("EndTs", dipt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dipt.result, nil } @@ -2309,7 +2352,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) metrics.ProxyReceiveBytes.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() it := &insertTask{ ctx: ctx, @@ -2355,7 +2398,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := node.sched.dmQueue.Enqueue(it); err != nil { log.Warn("Failed to enqueue insert task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return constructFailedResponse(err), nil } @@ -2364,7 +2407,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := it.WaitToFinish(); err != nil { log.Warn("Failed to execute insert task in task scheduler: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return constructFailedResponse(err), nil } @@ -2388,11 +2431,17 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(it.insertMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex)) - metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyInsertVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(successCnt)) + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName). + Observe(float64(tr.ElapseSpan().Milliseconds())) return it.result, nil } @@ -2424,7 +2473,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dr := &deleteRunner{ req: request, @@ -2440,7 +2489,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) if err := dr.Init(ctx); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), @@ -2452,7 +2501,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) if err := dr.Run(ctx); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), @@ -2466,8 +2515,10 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) return dr.result, nil } @@ -2497,7 +2548,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) metrics.ProxyReceiveBytes.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() request.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Upsert), @@ -2532,7 +2583,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) log.Info("Failed to enqueue upsert task", zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), }, nil @@ -2546,7 +2597,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) log.Info("Failed to execute insert task in task scheduler", zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() // Not every error case changes the status internally // change status there to handle it if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { @@ -2583,10 +2634,14 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() successCnt := it.result.UpsertCnt - int64(len(it.result.ErrIndex)) - metrics.ProxyUpsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpsertVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(successCnt)) + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("Finish processing upsert request in Proxy") @@ -2640,6 +2695,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search") @@ -2710,6 +2767,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2734,6 +2793,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2754,14 +2815,20 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() - metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries())) + metrics.ProxySearchVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(qt.result.GetResults().GetNumQueries())) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -2816,6 +2883,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HybridSearch") @@ -2873,6 +2942,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2896,6 +2967,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2916,14 +2989,20 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() - metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(len(request.GetRequests()))) + metrics.ProxySearchVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(len(request.GetRequests()))) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.HybridSearchLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -3018,7 +3097,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* method := "Flush" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3032,7 +3111,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() resp.Status = merr.Status(err) return resp, nil @@ -3050,7 +3129,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.Uint64("BeginTs", ft.BeginTs()), zap.Uint64("EndTs", ft.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() resp.Status = merr.Status(err) return resp, nil @@ -3061,7 +3140,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.Uint64("BeginTs", ft.BeginTs()), zap.Uint64("EndTs", ft.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ft.result, nil } @@ -3100,6 +3179,8 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() log := log.Ctx(ctx).With( @@ -3144,6 +3225,8 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.QueryResults{ @@ -3160,7 +3243,7 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.QueryResults{ Status: merr.Status(err), @@ -3178,11 +3261,15 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -3235,7 +3322,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia method := "CreateAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3250,7 +3337,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3266,7 +3353,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.Error(err), zap.Uint64("BeginTs", cat.BeginTs()), zap.Uint64("EndTs", cat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3276,7 +3363,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.Uint64("BeginTs", cat.BeginTs()), zap.Uint64("EndTs", cat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cat.result, nil } @@ -3302,7 +3389,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe method := "DescribeAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3315,7 +3402,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), @@ -3329,7 +3416,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe if err := dat.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), }, nil @@ -3340,7 +3427,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dat.result, nil } @@ -3366,7 +3453,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase method := "ListAliases" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3378,7 +3465,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), @@ -3392,7 +3479,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase if err := lat.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Uint64("BeginTs", lat.BeginTs()), zap.Uint64("EndTs", lat.EndTs()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), }, nil @@ -3403,7 +3490,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase zap.Uint64("BeginTs", lat.BeginTs()), zap.Uint64("EndTs", lat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lat.result, nil } @@ -3426,7 +3513,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq method := "DropAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3439,7 +3526,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return merr.Status(err), nil } @@ -3456,7 +3543,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return merr.Status(err), nil } @@ -3466,7 +3553,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dat.result, nil } @@ -3489,7 +3576,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR method := "AlterAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3503,7 +3590,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3520,7 +3607,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.Uint64("BeginTs", aat.BeginTs()), zap.Uint64("EndTs", aat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3530,7 +3617,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.Uint64("BeginTs", aat.BeginTs()), zap.Uint64("EndTs", aat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return aat.result, nil } @@ -3666,12 +3753,12 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G method := "GetPersistentSegmentInfo" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() // list segments collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3683,7 +3770,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G States: []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed}, }) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3698,7 +3785,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G }) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() log.Warn("GetPersistentSegmentInfo fail", zap.Error(err)) resp.Status = merr.Status(err) @@ -3707,7 +3794,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G err = merr.Error(infoResp.GetStatus()) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3725,7 +3812,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G } } metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Infos = persistentInfos return resp, nil @@ -3754,11 +3841,11 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue method := "GetQuerySegmentInfo" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() collID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.CollectionName) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3773,7 +3860,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue err = merr.Error(infoResp.GetStatus()) } if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() log.Error("Failed to get segment info from QueryCoord", zap.Error(err)) resp.Status = merr.Status(err) @@ -3797,7 +3884,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue } } - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Infos = queryInfos return resp, nil @@ -5028,14 +5115,14 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr log.Warn("CreateResourceGroup failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateResourceGroup") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &CreateResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5052,7 +5139,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr if err := node.sched.ddQueue.Enqueue(t); err != nil { log.Warn("CreateResourceGroup failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("CreateResourceGroup enqueued", @@ -5064,7 +5151,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("CreateResourceGroup done", @@ -5072,14 +5159,14 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } -func getErrResponse(err error, method string) *commonpb.Status { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - +func getErrResponse(err error, method string, dbName string, collectionName string) *commonpb.Status { + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, dbName, collectionName).Inc() return merr.Status(err) } @@ -5093,7 +5180,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &DropResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5111,7 +5198,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop log.Warn("DropResourceGroup failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("DropResourceGroup enqueued", @@ -5123,7 +5210,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("DropResourceGroup done", @@ -5131,7 +5218,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5146,21 +5233,21 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN log.Warn("TransferNode failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil { log.Warn("TransferNode failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferNode") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &TransferNodeTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5178,7 +5265,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN log.Warn("TransferNode failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("TransferNode enqueued", @@ -5190,7 +5277,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("TransferNode done", @@ -5198,7 +5285,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5213,21 +5300,21 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf log.Warn("TransferReplica failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil { log.Warn("TransferReplica failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferReplica") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() t := &TransferReplicaTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5245,7 +5332,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf log.Warn("TransferReplica failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } log.Debug("TransferReplica enqueued", @@ -5257,7 +5344,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } log.Info("TransferReplica done", @@ -5265,7 +5352,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5282,7 +5369,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis method := "ListResourceGroups" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &ListResourceGroupsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5301,7 +5388,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, "", "").Inc() return &milvuspb.ListResourceGroupsResponse{ Status: merr.Status(err), }, nil @@ -5317,7 +5404,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, "", "").Inc() return &milvuspb.ListResourceGroupsResponse{ Status: merr.Status(err), }, nil @@ -5328,7 +5415,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5342,7 +5429,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. method := "DescribeResourceGroup" GetErrResponse := func(err error) *milvuspb.DescribeResourceGroupResponse { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", "").Inc() return &milvuspb.DescribeResourceGroupResponse{ Status: merr.Status(err), @@ -5353,7 +5440,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &DescribeResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5391,7 +5478,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5628,12 +5715,12 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) nodeID := fmt.Sprint(paramtable.GetNodeID()) defer func() { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() if resp.GetStatus().GetCode() != 0 { log.Warn("import failed", zap.String("err", resp.GetStatus().GetReason())) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() } }() @@ -5714,7 +5801,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) resp, err = node.dataCoord.ImportV2(ctx, importRequest) if err != nil { log.Warn("import failed", zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, err @@ -5737,11 +5824,11 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp resp, err := node.dataCoord.GetImportProgress(ctx, req) if resp.GetStatus().GetCode() != 0 || err != nil { log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), "").Inc() } - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, err } @@ -5765,7 +5852,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR log.Info(rpcReceived(method)) nodeID := fmt.Sprint(paramtable.GetNodeID()) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() var ( err error @@ -5775,7 +5862,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { resp.Status = merr.Status(err) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() return resp, nil } } @@ -5784,9 +5871,9 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR }) if resp.GetStatus().GetCode() != 0 || err != nil { log.Warn("list imports", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() } metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, nil diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 6acb2f357f..4315f50d46 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -128,6 +128,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { panic(err) } metrics.QueryNodeNumEntities.WithLabelValues( + growing.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(growing.Collection()), fmt.Sprint(growing.Partition()), diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index b9685fd499..7e20a93817 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -117,6 +117,47 @@ func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegme return _c } +// DatabaseName provides a mock function with given fields: +func (_m *MockSegment) DatabaseName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockSegment_DatabaseName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DatabaseName' +type MockSegment_DatabaseName_Call struct { + *mock.Call +} + +// DatabaseName is a helper method to define mock.On call +func (_e *MockSegment_Expecter) DatabaseName() *MockSegment_DatabaseName_Call { + return &MockSegment_DatabaseName_Call{Call: _e.mock.On("DatabaseName")} +} + +func (_c *MockSegment_DatabaseName_Call) Run(run func()) *MockSegment_DatabaseName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_DatabaseName_Call) Return(_a0 string) *MockSegment_DatabaseName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_DatabaseName_Call) RunAndReturn(run func() string) *MockSegment_DatabaseName_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: ctx, primaryKeys, timestamps func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error { ret := _m.Called(ctx, primaryKeys, timestamps) @@ -911,6 +952,47 @@ func (_c *MockSegment_Release_Call) RunAndReturn(run func(...releaseOption)) *Mo return _c } +// ResourceGroup provides a mock function with given fields: +func (_m *MockSegment) ResourceGroup() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockSegment_ResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResourceGroup' +type MockSegment_ResourceGroup_Call struct { + *mock.Call +} + +// ResourceGroup is a helper method to define mock.On call +func (_e *MockSegment_Expecter) ResourceGroup() *MockSegment_ResourceGroup_Call { + return &MockSegment_ResourceGroup_Call{Call: _e.mock.On("ResourceGroup")} +} + +func (_c *MockSegment_ResourceGroup_Call) Run(run func()) *MockSegment_ResourceGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_ResourceGroup_Call) Return(_a0 string) *MockSegment_ResourceGroup_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_ResourceGroup_Call) RunAndReturn(run func() string) *MockSegment_ResourceGroup_Call { + _c.Call.Return(run) + return _c +} + // ResourceUsageEstimate provides a mock function with given fields: func (_m *MockSegment) ResourceUsageEstimate() ResourceUsage { ret := _m.Called() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 177cee15bf..ed812cc78d 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -118,6 +118,14 @@ func (s *baseSegment) Partition() int64 { return s.loadInfo.GetPartitionID() } +func (s *baseSegment) DatabaseName() string { + return s.collection.GetDBName() +} + +func (s *baseSegment) ResourceGroup() string { + return s.collection.GetResourceGroup() +} + func (s *baseSegment) Shard() string { return s.loadInfo.GetInsertChannel() } @@ -1361,6 +1369,7 @@ func (s *LocalSegment) Release(opts ...releaseOption) { C.DeleteSegment(ptr) metrics.QueryNodeNumEntities.WithLabelValues( + s.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.Collection()), fmt.Sprint(s.Partition()), diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 017b3b4d93..0652d1d68f 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -23,7 +23,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" - storage "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -50,6 +50,8 @@ type Segment interface { // Properties ID() int64 + DatabaseName() string + ResourceGroup() string Collection() int64 Partition() int64 Shard() string diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 5171e0049e..5a87ce3822 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1027,6 +1027,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } metrics.QueryNodeNumEntities.WithLabelValues( + segment.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), fmt.Sprint(segment.Partition()), @@ -1263,6 +1264,14 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, return err } + metrics.QueryNodeNumEntities.WithLabelValues( + segment.DatabaseName(), + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + strconv.FormatInt(int64(len(segment.Indexes())), 10), + ).Sub(float64(deltaData.RowCount)) log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount)) return nil } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 82b442827d..c8b9646e74 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -133,11 +133,10 @@ func (mt *MetaTable) reload() error { mt.names = newNameDb() mt.aliases = newNameDb() - collectionNum := int64(0) partitionNum := int64(0) - metrics.RootCoordNumOfCollections.Set(float64(0)) - metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(0)) + metrics.RootCoordNumOfCollections.Reset() + metrics.RootCoordNumOfPartitions.Reset() // recover databases. dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp) @@ -173,6 +172,7 @@ func (mt *MetaTable) reload() error { if err != nil { return err } + collectionNum := int64(0) for _, collection := range collections { mt.collID2Meta[collection.CollectionID] = collection if collection.Available() { @@ -181,9 +181,12 @@ func (mt *MetaTable) reload() error { partitionNum += int64(collection.GetPartitionNum(true)) } } - } - log.Info("recover collections from db", zap.Int64("collection_num", collectionNum), zap.Int64("partition_num", partitionNum)) + metrics.RootCoordNumOfCollections.WithLabelValues(dbName).Add(float64(collectionNum)) + log.Info("collections recovered from db", zap.String("db_name", dbName), + zap.Int64("collection_num", collectionNum), + zap.Int64("partition_num", partitionNum)) + } // recover aliases from db namespace for dbName, db := range mt.dbName2Meta { @@ -197,7 +200,6 @@ func (mt *MetaTable) reload() error { } } - metrics.RootCoordNumOfCollections.Add(float64(collectionNum)) metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum)) log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan())) return nil @@ -233,7 +235,7 @@ func (mt *MetaTable) reloadWithNonDatabase() error { mt.aliases.insert(util.DefaultDBName, alias.Name, alias.CollectionID) } - metrics.RootCoordNumOfCollections.Add(float64(collectionNum)) + metrics.RootCoordNumOfCollections.WithLabelValues(util.DefaultDBName).Add(float64(collectionNum)) metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum)) return nil } @@ -398,12 +400,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni } mt.collID2Meta[collectionID] = clone + db, err := mt.getDatabaseByIDInternal(ctx, coll.DBID, typeutil.MaxTimestamp) + if err != nil { + return fmt.Errorf("dbID not found for collection:%d", collectionID) + } + switch state { case pb.CollectionState_CollectionCreated: - metrics.RootCoordNumOfCollections.Inc() + metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc() metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true))) default: - metrics.RootCoordNumOfCollections.Dec() + metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec() metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true))) } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 3280f8cd9b..5bd3b21628 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1333,6 +1333,26 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) { assert.Error(t, err) }) + t.Run("not found dbID", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterCollection", + mock.Anything, // context.Context + mock.Anything, // *model.Collection + mock.Anything, // *model.Collection + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + dbName2Meta: map[string]*model.Database{}, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID}, + }, + } + err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) + assert.Error(t, err) + }) + t.Run("normal case", func(t *testing.T) { catalog := mocks.NewRootCoordCatalog(t) catalog.On("AlterCollection", @@ -1344,8 +1364,11 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) { ).Return(nil) meta := &MetaTable{ catalog: catalog, + dbName2Meta: map[string]*model.Database{ + util.DefaultDBName: {Name: util.DefaultDBName, ID: util.DefaultDBID}, + }, collID2Meta: map[typeutil.UniqueID]*model.Collection{ - 100: {Name: "test", CollectionID: 100}, + 100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID}, }, } err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5cf2ae5a00..056322d7f6 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -894,6 +894,7 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfDatabases.Dec() + metrics.CleanupRootCoordDBMetrics(in.GetDbName()) log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 7ced1000d9..9a31cd3af3 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -82,6 +82,8 @@ var ( Help: "stored l0 segment rate", }, []string{}) + // DataCoordNumStoredRows all metrics will be cleaned up after removing matched collectionID and + // segment state labels in CleanupDataCoordNumStoredRows method. DataCoordNumStoredRows = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -89,6 +91,7 @@ var ( Name: "stored_rows_num", Help: "number of stored rows of healthy segment", }, []string{ + databaseLabelName, collectionIDLabelName, segmentStateLabelName, }) @@ -132,6 +135,7 @@ var ( Name: "stored_binlog_size", Help: "binlog size of healthy segments", }, []string{ + databaseLabelName, collectionIDLabelName, segmentIDLabelName, }) @@ -318,7 +322,7 @@ func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) { func CleanupDataCoordNumStoredRows(collectionID int64) { for _, state := range commonpb.SegmentState_name { - DataCoordNumStoredRows.Delete(prometheus.Labels{ + DataCoordNumStoredRows.DeletePartialMatch(prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), segmentStateLabelName: fmt.Sprint(state), }) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index dec48b77aa..8494695f7c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -81,6 +81,8 @@ const ( functionLabelName = "function_name" queryTypeLabelName = "query_type" collectionName = "collection_name" + databaseLabelName = "db_name" + resourceGroupLabelName = "rg" indexName = "index_name" isVectorIndex = "is_vector_index" segmentStateLabelName = "segment_state" diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 085dbdbe0b..99c2d8d414 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -69,3 +69,69 @@ func TestRegisterRuntimeInfo(t *testing.T) { assert.Equal(t, "etcd", metaType) assert.Equal(t, "pulsar", mqType) } + +// TestDeletePartialMatch test deletes all metrics where the variable labels contain all of those +// passed in as labels based on DeletePartialMatch API +func TestDeletePartialMatch(t *testing.T) { + baseVec := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "test", + Help: "helpless", + }, + []string{"l1", "l2", "l3"}, + ) + + baseVec.WithLabelValues("l1-1", "l2-1", "l3-1").Inc() + baseVec.WithLabelValues("l1-2", "l2-2", "l3-2").Inc() + baseVec.WithLabelValues("l1-2", "l2-3", "l3-3").Inc() + + baseVec.WithLabelValues("l1-3", "l2-3", "l3-3").Inc() + baseVec.WithLabelValues("l1-3", "l2-3", "").Inc() + baseVec.WithLabelValues("l1-3", "l2-4", "l3-4").Inc() + + baseVec.WithLabelValues("l1-4", "l2-5", "l3-5").Inc() + baseVec.WithLabelValues("l1-4", "l2-5", "l3-6").Inc() + baseVec.WithLabelValues("l1-5", "l2-6", "l3-6").Inc() + + getMetricsCount := func() int { + chs := make(chan prometheus.Metric, 10) + baseVec.Collect(chs) + return len(chs) + } + + // the prefix is matched which has one labels + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-2"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 7, getMetricsCount()) + + // the prefix is matched which has two labels + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-3"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 5, getMetricsCount()) + + // the first and latest labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-1", "l3": "l3-1"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 4, getMetricsCount()) + + // the middle labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-5"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 2, getMetricsCount()) + + // the middle labels and suffix labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-6", "l3": "l3-6"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 1, getMetricsCount()) + + // all labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-4", "l3": "l3-4"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 0, getMetricsCount()) +} diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index dd72209b66..de1f25f551 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -40,7 +40,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "search_vectors_count", Help: "counter of vectors successfully searched", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) // ProxyInsertVectors record the number of vectors insert successfully. ProxyInsertVectors = prometheus.NewCounterVec( @@ -49,7 +49,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "insert_vectors_count", Help: "counter of vectors successfully inserted", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) // ProxyUpsertVectors record the number of vectors upsert successfully. ProxyUpsertVectors = prometheus.NewCounterVec( @@ -58,7 +58,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "upsert_vectors_count", Help: "counter of vectors successfully upserted", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) ProxyDeleteVectors = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -76,9 +76,10 @@ var ( Name: "sq_latency", Help: "latency of search or query successfully", Buckets: buckets, - }, []string{nodeIDLabelName, queryTypeLabelName}) + }, []string{nodeIDLabelName, queryTypeLabelName, databaseLabelName, collectionName}) // ProxyCollectionSQLatency record the latency of search successfully, per collection + // Deprecated, ProxySQLatency instead of it ProxyCollectionSQLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -96,9 +97,10 @@ var ( Name: "mutation_latency", Help: "latency of insert or delete successfully", Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, msgTypeLabelName}) + }, []string{nodeIDLabelName, msgTypeLabelName, databaseLabelName, collectionName}) - // ProxyMutationLatency record the latency that mutate successfully, per collection + // ProxyCollectionMutationLatency record the latency that mutate successfully, per collection + // Deprecated, ProxyMutationLatency instead of it ProxyCollectionMutationLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -107,6 +109,7 @@ var ( Help: "latency of insert or delete successfully, per collection", Buckets: buckets, }, []string{nodeIDLabelName, msgTypeLabelName, collectionName}) + // ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result. ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -230,7 +233,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "req_count", Help: "count of operation executed", - }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) + }, []string{nodeIDLabelName, functionLabelName, statusLabelName, databaseLabelName, collectionName}) // ProxyReqLatency records the latency that for all requests, like "CreateCollection". ProxyReqLatency = prometheus.NewHistogramVec( @@ -372,7 +375,59 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySlowQueryCount) } -func CleanupCollectionMetrics(nodeID int64, collection string) { +func CleanupProxyDBMetrics(nodeID int64, dbName string) { + ProxySearchVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxySQLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) +} + +func CleanupProxyCollectionMetrics(nodeID int64, collection string) { + ProxySearchVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxySQLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyCollectionSQLatency.Delete(prometheus.Labels{ nodeIDLabelName: strconv.FormatInt(nodeID, 10), queryTypeLabelName: SearchLabel, collectionName: collection, diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index d9a981e261..0de2b462a2 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -362,6 +362,7 @@ var ( Name: "entity_num", Help: "number of entities which can be searched/queried, clustered by collection, partition and state", }, []string{ + databaseLabelName, nodeIDLabelName, collectionIDLabelName, partitionIDLabelName, @@ -567,5 +568,11 @@ func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { msgTypeLabelName: label, collectionIDLabelName: fmt.Sprint(collectionID), }) + QueryNodeNumEntities. + DeletePartialMatch( + prometheus.Labels{ + nodeIDLabelName: fmt.Sprint(nodeID), + collectionIDLabelName: fmt.Sprint(collectionID), + }) } } diff --git a/pkg/metrics/rootcoord_metrics.go b/pkg/metrics/rootcoord_metrics.go index 3b7fdd0868..ddd3044078 100644 --- a/pkg/metrics/rootcoord_metrics.go +++ b/pkg/metrics/rootcoord_metrics.go @@ -93,13 +93,13 @@ var ( }) // RootCoordNumOfCollections counts the number of collections. - RootCoordNumOfCollections = prometheus.NewGauge( + RootCoordNumOfCollections = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, Subsystem: typeutil.RootCoordRole, Name: "collection_num", Help: "number of collections", - }) + }, []string{databaseLabelName}) // RootCoordNumOfPartitions counts the number of partitions per collection. RootCoordNumOfPartitions = prometheus.NewGaugeVec( @@ -247,3 +247,9 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumEntities) registry.MustRegister(RootCoordIndexedNumEntities) } + +func CleanupRootCoordDBMetrics(dbName string) { + RootCoordNumOfCollections.Delete(prometheus.Labels{ + databaseLabelName: dbName, + }) +}