diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e73de2afb4..bf86161a42 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -84,6 +84,7 @@ type collectionInfo struct { StartPositions []*commonpb.KeyDataPair Properties map[string]string CreatedAt Timestamp + DatabaseName string } // NewMeta creates meta from provided `kv.TxnKV` @@ -199,6 +200,7 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo { Partitions: coll.Partitions, StartPositions: common.CloneKeyDataPairs(coll.StartPositions), Properties: clonedProperties, + DatabaseName: coll.DatabaseName, } return cloneColl @@ -276,9 +278,14 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) { collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows() } } - for collection, statesRows := range collectionRowsNum { + for collectionID, statesRows := range collectionRowsNum { for state, rows := range statesRows { - metrics.DataCoordNumStoredRows.WithLabelValues(fmt.Sprint(collection), state.String()).Set(float64(rows)) + coll, ok := m.collections[collectionID] + if ok { + metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), state.String()).Set(float64(rows)) + } else { + log.Warn("not found database name", zap.Int64("collectionID", collectionID)) + } } } return total, collectionBinlogSize diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 85dadc4520..fffe6898b7 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1206,6 +1206,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i StartPositions: resp.GetStartPositions(), Properties: properties, CreatedAt: resp.GetCreatedTimestamp(), + DatabaseName: resp.GetDbName(), } s.meta.AddCollection(collInfo) return nil diff --git a/internal/proxy/hook_interceptor.go b/internal/proxy/hook_interceptor.go index 79e833b583..448e6d217c 100644 --- a/internal/proxy/hook_interceptor.go +++ b/internal/proxy/hook_interceptor.go @@ -64,8 +64,8 @@ func updateProxyFunctionCallMetric(fullMethod string) { if method == "" { return } - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, "", "").Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", "").Inc() } func getCurrentUser(ctx context.Context) string { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 5ce848f3ac..b22cba1a6f 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -155,9 +155,9 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p // no need to handle error, since this Proxy may not create dml stream for the collection. node.chMgr.removeDMLStream(request.GetCollectionID()) // clean up collection level metrics - metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName) + metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), collectionName) for _, alias := range aliasName { - metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias) + metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), alias) } } log.Info("complete to invalidate collection meta cache") @@ -180,6 +180,8 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + "", ).Inc() cct := &createDatabaseTask{ @@ -200,7 +202,9 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD if err := node.sched.ddQueue.Enqueue(cct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -208,7 +212,9 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD if err := cct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -217,6 +223,8 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -241,6 +249,8 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + "", ).Inc() dct := &dropDatabaseTask{ @@ -260,14 +270,18 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab log.Info(rpcReceived(method)) if err := node.sched.ddQueue.Enqueue(dct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } log.Info(rpcEnqueued(method)) if err := dct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), ""). + Inc() return merr.Status(err), nil } @@ -276,6 +290,8 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -283,6 +299,7 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab method, ).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.CleanupProxyDBMetrics(paramtable.GetNodeID(), request.GetDbName()) return dct.result, nil } @@ -302,6 +319,8 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + "", + "", ).Inc() dct := &listDatabaseTask{ @@ -320,7 +339,9 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData if err := node.sched.ddQueue.Enqueue(dct); err != nil { log.Warn(rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, "", ""). + Inc() resp.Status = merr.Status(err) return resp, nil } @@ -328,7 +349,9 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData log.Info(rpcEnqueued(method)) if err := dct.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", ""). + Inc() resp.Status = merr.Status(err) return resp, nil } @@ -338,6 +361,8 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + "", + "", ).Inc() metrics.ProxyReqLatency.WithLabelValues( @@ -364,6 +389,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() cct := &createCollectionTask{ @@ -392,7 +419,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -410,7 +437,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.Uint64("BeginTs", cct.BeginTs()), zap.Uint64("EndTs", cct.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -424,6 +451,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -447,6 +476,8 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() dct := &dropCollectionTask{ @@ -470,7 +501,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol log.Warn("DropCollection failed to enqueue", zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -486,7 +517,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol zap.Uint64("BeginTs", dct.BeginTs()), zap.Uint64("EndTs", dct.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -500,6 +531,8 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -525,6 +558,8 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() log := log.Ctx(ctx).With( @@ -547,7 +582,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil @@ -566,7 +601,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.Uint64("EndTS", hct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), }, nil @@ -582,6 +617,8 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -605,6 +642,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() lct := &loadCollectionTask{ @@ -630,7 +669,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -646,7 +685,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.Uint64("BeginTS", lct.BeginTs()), zap.Uint64("EndTS", lct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -660,6 +699,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxyReqLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -680,7 +721,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele method := "ReleaseCollection" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() rct := &releaseCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -702,7 +743,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -719,7 +760,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -729,7 +770,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rct.result, nil } @@ -747,7 +788,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des method := "DescribeCollection" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dct := &describeCollectionTask{ ctx: ctx, @@ -768,7 +809,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), }, nil @@ -785,7 +826,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.Uint64("EndTS", dct.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeCollectionResponse{ Status: merr.Status(err), @@ -800,7 +841,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des ) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dct.result, nil } @@ -819,7 +860,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati method := "GetStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getStatisticsTask{ request: request, Condition: NewTaskCondition(ctx), @@ -846,7 +887,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Strings("partitions", request.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetStatisticsResponse{ Status: merr.Status(err), @@ -868,7 +909,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Strings("partitions", request.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetStatisticsResponse{ Status: merr.Status(err), @@ -881,7 +922,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -899,7 +940,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp method := "GetCollectionStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getCollectionStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -920,7 +961,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetCollectionStatisticsResponse{ Status: merr.Status(err), @@ -940,7 +981,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetCollectionStatisticsResponse{ Status: merr.Status(err), @@ -953,7 +994,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -969,7 +1010,9 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo defer sp.End() method := "ShowCollections" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), ""). + Inc() sct := &showCollectionsTask{ ctx: ctx, @@ -994,7 +1037,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Error(err), zap.Any("CollectionNames", request.CollectionNames)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), }, nil @@ -1009,7 +1052,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Error(err), zap.Any("CollectionNames", request.CollectionNames)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return &milvuspb.ShowCollectionsResponse{ Status: merr.Status(err), @@ -1020,7 +1063,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Int("len(CollectionNames)", len(request.CollectionNames)), zap.Int("num_collections", len(sct.result.CollectionNames))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return sct.result, nil } @@ -1035,7 +1078,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC method := "AlterCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() act := &alterCollectionTask{ ctx: ctx, @@ -1059,7 +1102,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1076,7 +1119,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.Uint64("BeginTs", act.BeginTs()), zap.Uint64("EndTs", act.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1085,7 +1128,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.Uint64("BeginTs", act.BeginTs()), zap.Uint64("EndTs", act.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return act.result, nil } @@ -1100,7 +1143,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create defer sp.End() method := "CreatePartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() cpt := &createPartitionTask{ ctx: ctx, @@ -1123,7 +1166,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1140,7 +1183,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.Uint64("BeginTS", cpt.BeginTs()), zap.Uint64("EndTS", cpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1150,7 +1193,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.Uint64("BeginTS", cpt.BeginTs()), zap.Uint64("EndTS", cpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cpt.result, nil } @@ -1165,7 +1208,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart defer sp.End() method := "DropPartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dpt := &dropPartitionTask{ ctx: ctx, @@ -1189,7 +1232,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1206,7 +1249,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.Uint64("BeginTS", dpt.BeginTs()), zap.Uint64("EndTS", dpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1216,7 +1259,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.Uint64("BeginTS", dpt.BeginTs()), zap.Uint64("EndTS", dpt.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dpt.result, nil } @@ -1235,7 +1278,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit tr := timerecord.NewTimeRecorder(method) // TODO: use collectionID instead of collectionName metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() hpt := &hasPartitionTask{ ctx: ctx, @@ -1259,7 +1302,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), @@ -1280,7 +1323,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Uint64("EndTS", hpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.BoolResponse{ Status: merr.Status(err), @@ -1294,7 +1337,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.Uint64("EndTS", hpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return hpt.result, nil } @@ -1310,7 +1353,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar method := "LoadPartitions" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() lpt := &loadPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -1335,7 +1378,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1353,7 +1396,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Uint64("EndTS", lpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1364,7 +1407,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.Uint64("EndTS", lpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lpt.result, nil } @@ -1389,7 +1432,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele method := "ReleasePartitions" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1405,7 +1448,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1423,7 +1466,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1434,7 +1477,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.Uint64("EndTS", rpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rpt.result, nil } @@ -1452,7 +1495,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb method := "GetPartitionStatistics" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() g := &getPartitionStatisticsTask{ ctx: ctx, @@ -1475,7 +1518,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetPartitionStatisticsResponse{ Status: merr.Status(err), @@ -1495,7 +1538,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetPartitionStatisticsResponse{ Status: merr.Status(err), @@ -1508,7 +1551,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.Uint64("EndTS", g.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -1537,7 +1580,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar tr := timerecord.NewTimeRecorder(method) // TODO: use collectionID instead of collectionName metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole)) @@ -1552,7 +1595,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("request", request)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), @@ -1578,7 +1621,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ShowPartitionsResponse{ Status: merr.Status(err), @@ -1594,7 +1637,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return spt.result, nil } @@ -1607,7 +1650,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get tr := timerecord.NewTimeRecorder(method) ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadingProgress") defer sp.End() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx) log.Debug( @@ -1619,7 +1662,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get zap.String("collectionName", request.CollectionName), zap.Strings("partitionName", request.PartitionNames), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) { return &milvuspb.GetLoadingProgressResponse{ Status: merr.Status(err), @@ -1667,7 +1710,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get log.Debug( rpcDone(method), zap.Any("request", request)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.GetLoadingProgressResponse{ Status: merr.Success(), @@ -1684,7 +1727,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt tr := timerecord.NewTimeRecorder(method) ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadState") defer sp.End() - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx) log.Debug( @@ -1696,7 +1739,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt zap.String("collection_name", request.CollectionName), zap.Strings("partition_name", request.PartitionNames), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetLoadStateResponse{ Status: merr.Status(err), } @@ -1713,7 +1756,7 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt log.Debug( rpcDone(method), zap.Any("request", request)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) }() @@ -1797,7 +1840,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde method := "CreateIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1814,7 +1857,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1832,7 +1875,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Uint64("EndTs", cit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1843,7 +1886,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.Uint64("EndTs", cit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cit.result, nil } @@ -1868,7 +1911,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR method := "AlterIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1885,7 +1928,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1903,7 +1946,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Uint64("EndTs", task.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -1914,7 +1957,7 @@ func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexR zap.Uint64("EndTs", task.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return task.result, nil } @@ -1941,7 +1984,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe // avoid data race tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -1958,7 +2001,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeIndexResponse{ Status: merr.Status(err), @@ -1978,7 +2021,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.DescribeIndexResponse{ Status: merr.Status(err), @@ -1991,7 +2034,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2018,7 +2061,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get // avoid data race tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2034,7 +2077,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStatisticsResponse{ Status: merr.Status(err), @@ -2048,7 +2091,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get if err := dit.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Error(err), zap.Uint64("BeginTs", dit.BeginTs()), zap.Uint64("EndTs", dit.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStatisticsResponse{ Status: merr.Status(err), }, nil @@ -2060,7 +2103,7 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil @@ -2087,7 +2130,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq method := "DropIndex" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2103,7 +2146,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq rpcFailedToEnqueue(method), zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -2121,7 +2164,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -2132,7 +2175,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.Uint64("EndTs", dit.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2161,7 +2204,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. method := "GetIndexBuildProgress" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2177,7 +2220,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. rpcFailedToEnqueue(method), zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexBuildProgressResponse{ Status: merr.Status(err), @@ -2196,7 +2239,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.Uint64("BeginTs", gibpt.BeginTs()), zap.Uint64("EndTs", gibpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexBuildProgressResponse{ Status: merr.Status(err), @@ -2209,7 +2252,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.Uint64("EndTs", gibpt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return gibpt.result, nil } @@ -2237,7 +2280,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex method := "GetIndexState" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -2254,7 +2297,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStateResponse{ Status: merr.Status(err), @@ -2273,7 +2316,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Uint64("BeginTs", dipt.BeginTs()), zap.Uint64("EndTs", dipt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.GetIndexStateResponse{ Status: merr.Status(err), @@ -2286,7 +2329,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.Uint64("EndTs", dipt.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dipt.result, nil } @@ -2315,7 +2358,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) metrics.ProxyReceiveBytes.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() it := &insertTask{ ctx: ctx, @@ -2361,7 +2404,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := node.sched.dmQueue.Enqueue(it); err != nil { log.Warn("Failed to enqueue insert task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return constructFailedResponse(err), nil } @@ -2370,7 +2413,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := it.WaitToFinish(); err != nil { log.Warn("Failed to execute insert task in task scheduler: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return constructFailedResponse(err), nil } @@ -2394,7 +2437,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(it.insertMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex)) v := Extension.Report(map[string]any{ hookutil.OpTypeKey: hookutil.OpTypeInsert, @@ -2405,9 +2448,15 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) hookutil.FailCntKey: len(it.result.ErrIndex), }) SetReportValue(it.result.GetStatus(), v) - metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyInsertVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(successCnt)) + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName). + Observe(float64(tr.ElapseSpan().Milliseconds())) return it.result, nil } @@ -2439,7 +2488,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() dr := &deleteRunner{ req: request, @@ -2455,7 +2504,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) if err := dr.Init(ctx); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), @@ -2467,7 +2516,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) if err := dr.Run(ctx); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), @@ -2490,8 +2539,10 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) SetReportValue(dr.result.GetStatus(), v) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) return dr.result, nil } @@ -2521,7 +2572,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) metrics.ProxyReceiveBytes.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() request.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_Upsert), @@ -2556,7 +2607,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) log.Info("Failed to enqueue upsert task", zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.MutationResult{ Status: merr.Status(err), }, nil @@ -2570,7 +2621,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) log.Info("Failed to execute insert task in task scheduler", zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() // Not every error case changes the status internally // change status there to handle it if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { @@ -2617,10 +2668,14 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() successCnt := it.result.UpsertCnt - int64(len(it.result.ErrIndex)) - metrics.ProxyUpsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) - metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpsertVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(successCnt)) + metrics.ProxyMutationLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetDbName(), request.GetCollectionName()). + Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("Finish processing upsert request in Proxy") @@ -2674,6 +2729,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search") @@ -2745,6 +2802,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2769,6 +2828,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2789,14 +2850,20 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() - metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries())) + metrics.ProxySearchVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(qt.result.GetResults().GetNumQueries())) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -2860,6 +2927,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HybridSearch") @@ -2916,6 +2985,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2939,6 +3010,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.SearchResults{ @@ -2959,14 +3032,20 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() - metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(len(qt.request.GetRequests()))) + metrics.ProxySearchVectors. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + Add(float64(len(qt.request.GetRequests()))) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.HybridSearchLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -3070,7 +3149,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* method := "Flush" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3084,7 +3163,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() resp.Status = merr.Status(err) return resp, nil @@ -3102,7 +3181,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.Uint64("BeginTs", ft.BeginTs()), zap.Uint64("EndTs", ft.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() resp.Status = merr.Status(err) return resp, nil @@ -3113,7 +3192,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.Uint64("BeginTs", ft.BeginTs()), zap.Uint64("EndTs", ft.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ft.result, nil } @@ -3152,6 +3231,8 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() log := log.Ctx(ctx).With( @@ -3196,6 +3277,8 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() return &milvuspb.QueryResults{ @@ -3212,7 +3295,7 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.QueryResults{ Status: merr.Status(err), @@ -3230,11 +3313,15 @@ func (node *Proxy) query(ctx context.Context, qt *queryTask) (*milvuspb.QueryRes strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, + request.GetDbName(), + request.GetCollectionName(), ).Inc() metrics.ProxySQLatency.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel, + request.GetDbName(), + request.GetCollectionName(), ).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionSQLatency.WithLabelValues( @@ -3299,7 +3386,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia method := "CreateAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3314,7 +3401,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3330,7 +3417,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.Error(err), zap.Uint64("BeginTs", cat.BeginTs()), zap.Uint64("EndTs", cat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3340,7 +3427,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.Uint64("BeginTs", cat.BeginTs()), zap.Uint64("EndTs", cat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cat.result, nil } @@ -3366,7 +3453,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe method := "DescribeAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3379,7 +3466,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), @@ -3393,7 +3480,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe if err := dat.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return &milvuspb.DescribeAliasResponse{ Status: merr.Status(err), }, nil @@ -3404,7 +3491,7 @@ func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.Describe zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dat.result, nil } @@ -3430,7 +3517,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase method := "ListAliases" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3442,7 +3529,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), @@ -3456,7 +3543,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase if err := lat.WaitToFinish(); err != nil { log.Warn(rpcFailedToWaitToFinish(method), zap.Uint64("BeginTs", lat.BeginTs()), zap.Uint64("EndTs", lat.EndTs()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return &milvuspb.ListAliasesResponse{ Status: merr.Status(err), }, nil @@ -3467,7 +3554,7 @@ func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliase zap.Uint64("BeginTs", lat.BeginTs()), zap.Uint64("EndTs", lat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lat.result, nil } @@ -3490,7 +3577,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq method := "DropAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3503,7 +3590,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc() return merr.Status(err), nil } @@ -3520,7 +3607,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc() return merr.Status(err), nil } @@ -3530,7 +3617,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.Uint64("BeginTs", dat.BeginTs()), zap.Uint64("EndTs", dat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dat.result, nil } @@ -3553,7 +3640,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR method := "AlterAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), @@ -3567,7 +3654,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR log.Warn( rpcFailedToEnqueue(method), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3584,7 +3671,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.Uint64("BeginTs", aat.BeginTs()), zap.Uint64("EndTs", aat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() return merr.Status(err), nil } @@ -3594,7 +3681,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.Uint64("BeginTs", aat.BeginTs()), zap.Uint64("EndTs", aat.EndTs())) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return aat.result, nil } @@ -3730,12 +3817,12 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G method := "GetPersistentSegmentInfo" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() // list segments collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3747,7 +3834,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G States: []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed}, }) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3762,7 +3849,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G }) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() log.Warn("GetPersistentSegmentInfo fail", zap.Error(err)) resp.Status = merr.Status(err) @@ -3771,7 +3858,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G err = merr.Error(infoResp.GetStatus()) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3789,7 +3876,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G } } metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Infos = persistentInfos return resp, nil @@ -3818,11 +3905,11 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue method := "GetQuerySegmentInfo" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() collID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.CollectionName) if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() resp.Status = merr.Status(err) return resp, nil } @@ -3837,7 +3924,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue err = merr.Error(infoResp.GetStatus()) } if err != nil { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() log.Error("Failed to get segment info from QueryCoord", zap.Error(err)) resp.Status = merr.Status(err) @@ -3861,7 +3948,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue } } - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Infos = queryInfos return resp, nil @@ -5092,14 +5179,14 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr log.Warn("CreateResourceGroup failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateResourceGroup") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &CreateResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5116,7 +5203,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr if err := node.sched.ddQueue.Enqueue(t); err != nil { log.Warn("CreateResourceGroup failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("CreateResourceGroup enqueued", @@ -5128,7 +5215,7 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("CreateResourceGroup done", @@ -5136,14 +5223,14 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } -func getErrResponse(err error, method string) *commonpb.Status { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - +func getErrResponse(err error, method string, dbName string, collectionName string) *commonpb.Status { + metrics.ProxyFunctionCall. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, dbName, collectionName).Inc() return merr.Status(err) } @@ -5157,7 +5244,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &DropResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5175,7 +5262,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop log.Warn("DropResourceGroup failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("DropResourceGroup enqueued", @@ -5187,7 +5274,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("DropResourceGroup done", @@ -5195,7 +5282,7 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5210,21 +5297,21 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN log.Warn("TransferNode failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil { log.Warn("TransferNode failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferNode") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &TransferNodeTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5242,7 +5329,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN log.Warn("TransferNode failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Debug("TransferNode enqueued", @@ -5254,7 +5341,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, "", ""), nil } log.Info("TransferNode done", @@ -5262,7 +5349,7 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5277,21 +5364,21 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf log.Warn("TransferReplica failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil { log.Warn("TransferReplica failed", zap.Error(err), ) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferReplica") defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() t := &TransferReplicaTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5309,7 +5396,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf log.Warn("TransferReplica failed to enqueue", zap.Error(err)) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } log.Debug("TransferReplica enqueued", @@ -5321,7 +5408,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf zap.Error(err), zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) - return getErrResponse(err, method), nil + return getErrResponse(err, method, request.GetDbName(), request.GetCollectionName()), nil } log.Info("TransferReplica done", @@ -5329,7 +5416,7 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5346,7 +5433,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis method := "ListResourceGroups" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &ListResourceGroupsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5365,7 +5452,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Error(err)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.AbandonLabel).Inc() + metrics.AbandonLabel, "", "").Inc() return &milvuspb.ListResourceGroupsResponse{ Status: merr.Status(err), }, nil @@ -5381,7 +5468,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Uint64("BeginTS", t.BeginTs()), zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.FailLabel, "", "").Inc() return &milvuspb.ListResourceGroupsResponse{ Status: merr.Status(err), }, nil @@ -5392,7 +5479,7 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5406,7 +5493,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. method := "DescribeResourceGroup" GetErrResponse := func(err error) *milvuspb.DescribeResourceGroupResponse { - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", "").Inc() return &milvuspb.DescribeResourceGroupResponse{ Status: merr.Status(err), @@ -5417,7 +5504,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. defer sp.End() tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() + metrics.TotalLabel, "", "").Inc() t := &DescribeResourceGroupTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -5455,7 +5542,7 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. zap.Uint64("EndTS", t.EndTs())) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel).Inc() + metrics.SuccessLabel, "", "").Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return t.result, nil } @@ -5692,12 +5779,12 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) nodeID := fmt.Sprint(paramtable.GetNodeID()) defer func() { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() if resp.GetStatus().GetCode() != 0 { log.Warn("import failed", zap.String("err", resp.GetStatus().GetReason())) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() } }() @@ -5778,7 +5865,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) resp, err = node.dataCoord.ImportV2(ctx, importRequest) if err != nil { log.Warn("import failed", zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, err @@ -5801,11 +5888,11 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp resp, err := node.dataCoord.GetImportProgress(ctx, req) if resp.GetStatus().GetCode() != 0 || err != nil { log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), "").Inc() } - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), "").Inc() metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, err } @@ -5829,7 +5916,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR log.Info(rpcReceived(method)) nodeID := fmt.Sprint(paramtable.GetNodeID()) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.TotalLabel, req.GetDbName(), req.GetCollectionName()).Inc() var ( err error @@ -5839,7 +5926,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { resp.Status = merr.Status(err) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() return resp, nil } } @@ -5848,9 +5935,9 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR }) if resp.GetStatus().GetCode() != 0 || err != nil { log.Warn("list imports", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err)) - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), req.GetCollectionName()).Inc() } else { - metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.SuccessLabel, req.GetDbName(), req.GetCollectionName()).Inc() } metrics.ProxyReqLatency.WithLabelValues(nodeID, method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, nil diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 902bddd0ba..fea0e8cd8a 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -127,6 +127,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { panic(err) } metrics.QueryNodeNumEntities.WithLabelValues( + growing.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(growing.Collection()), fmt.Sprint(growing.Partition()), diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index a602fa0d6e..aabbfad421 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1384,6 +1384,7 @@ func (s *LocalSegment) Release(opts ...releaseOption) { C.DeleteSegment(ptr) metrics.QueryNodeNumEntities.WithLabelValues( + s.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.Collection()), fmt.Sprint(s.Partition()), diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 0f9f1e37c8..f8a340dce0 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -23,7 +23,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" - storage "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/typeutil" ) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index bdcee1d097..928ca70d13 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1056,6 +1056,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, } metrics.QueryNodeNumEntities.WithLabelValues( + segment.DatabaseName(), fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), fmt.Sprint(segment.Partition()), @@ -1293,6 +1294,14 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, return err } + metrics.QueryNodeNumEntities.WithLabelValues( + segment.DatabaseName(), + fmt.Sprint(paramtable.GetNodeID()), + fmt.Sprint(segment.Collection()), + fmt.Sprint(segment.Partition()), + segment.Type().String(), + strconv.FormatInt(int64(len(segment.Indexes())), 10), + ).Sub(float64(deltaData.RowCount)) log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount)) return nil } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 75a56a4c40..b416c2c93b 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -133,11 +133,10 @@ func (mt *MetaTable) reload() error { mt.names = newNameDb() mt.aliases = newNameDb() - collectionNum := int64(0) partitionNum := int64(0) - metrics.RootCoordNumOfCollections.Set(float64(0)) - metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(0)) + metrics.RootCoordNumOfCollections.Reset() + metrics.RootCoordNumOfPartitions.Reset() // recover databases. dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp) @@ -173,6 +172,7 @@ func (mt *MetaTable) reload() error { if err != nil { return err } + collectionNum := int64(0) for _, collection := range collections { mt.collID2Meta[collection.CollectionID] = collection if collection.Available() { @@ -181,9 +181,12 @@ func (mt *MetaTable) reload() error { partitionNum += int64(collection.GetPartitionNum(true)) } } - } - log.Info("recover collections from db", zap.Int64("collection_num", collectionNum), zap.Int64("partition_num", partitionNum)) + metrics.RootCoordNumOfCollections.WithLabelValues(dbName).Add(float64(collectionNum)) + log.Info("collections recovered from db", zap.String("db_name", dbName), + zap.Int64("collection_num", collectionNum), + zap.Int64("partition_num", partitionNum)) + } // recover aliases from db namespace for dbName, db := range mt.dbName2Meta { @@ -197,7 +200,6 @@ func (mt *MetaTable) reload() error { } } - metrics.RootCoordNumOfCollections.Add(float64(collectionNum)) metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum)) log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan())) return nil @@ -233,7 +235,7 @@ func (mt *MetaTable) reloadWithNonDatabase() error { mt.aliases.insert(util.DefaultDBName, alias.Name, alias.CollectionID) } - metrics.RootCoordNumOfCollections.Add(float64(collectionNum)) + metrics.RootCoordNumOfCollections.WithLabelValues(util.DefaultDBName).Add(float64(collectionNum)) metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum)) return nil } @@ -415,12 +417,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni } mt.collID2Meta[collectionID] = clone + db, err := mt.getDatabaseByIDInternal(ctx, coll.DBID, typeutil.MaxTimestamp) + if err != nil { + return fmt.Errorf("dbID not found for collection:%d", collectionID) + } + switch state { case pb.CollectionState_CollectionCreated: - metrics.RootCoordNumOfCollections.Inc() + metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc() metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true))) default: - metrics.RootCoordNumOfCollections.Dec() + metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec() metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true))) } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 69801f3f50..bcade0aafd 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1334,6 +1334,26 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) { assert.Error(t, err) }) + t.Run("not found dbID", func(t *testing.T) { + catalog := mocks.NewRootCoordCatalog(t) + catalog.On("AlterCollection", + mock.Anything, // context.Context + mock.Anything, // *model.Collection + mock.Anything, // *model.Collection + mock.Anything, // metastore.AlterType + mock.AnythingOfType("uint64"), + ).Return(nil) + meta := &MetaTable{ + catalog: catalog, + dbName2Meta: map[string]*model.Database{}, + collID2Meta: map[typeutil.UniqueID]*model.Collection{ + 100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID}, + }, + } + err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) + assert.Error(t, err) + }) + t.Run("normal case", func(t *testing.T) { catalog := mocks.NewRootCoordCatalog(t) catalog.On("AlterCollection", @@ -1345,8 +1365,11 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) { ).Return(nil) meta := &MetaTable{ catalog: catalog, + dbName2Meta: map[string]*model.Database{ + util.DefaultDBName: {Name: util.DefaultDBName, ID: util.DefaultDBID}, + }, collID2Meta: map[typeutil.UniqueID]*model.Collection{ - 100: {Name: "test", CollectionID: 100}, + 100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID}, }, } err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a6d2c5f973..9064654a3f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -894,6 +894,7 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfDatabases.Dec() + metrics.CleanupRootCoordDBMetrics(in.GetDbName()) log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 937ae8c780..380f8e31b6 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -82,6 +82,8 @@ var ( Help: "stored l0 segment rate", }, []string{}) + // DataCoordNumStoredRows all metrics will be cleaned up after removing matched collectionID and + // segment state labels in CleanupDataCoordNumStoredRows method. DataCoordNumStoredRows = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -89,6 +91,7 @@ var ( Name: "stored_rows_num", Help: "number of stored rows of healthy segment", }, []string{ + databaseLabelName, collectionIDLabelName, segmentStateLabelName, }) @@ -327,7 +330,7 @@ func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) { func CleanupDataCoordNumStoredRows(collectionID int64) { for _, state := range commonpb.SegmentState_name { - DataCoordNumStoredRows.Delete(prometheus.Labels{ + DataCoordNumStoredRows.DeletePartialMatch(prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), segmentStateLabelName: fmt.Sprint(state), }) diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 085dbdbe0b..99c2d8d414 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -69,3 +69,69 @@ func TestRegisterRuntimeInfo(t *testing.T) { assert.Equal(t, "etcd", metaType) assert.Equal(t, "pulsar", mqType) } + +// TestDeletePartialMatch test deletes all metrics where the variable labels contain all of those +// passed in as labels based on DeletePartialMatch API +func TestDeletePartialMatch(t *testing.T) { + baseVec := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "test", + Help: "helpless", + }, + []string{"l1", "l2", "l3"}, + ) + + baseVec.WithLabelValues("l1-1", "l2-1", "l3-1").Inc() + baseVec.WithLabelValues("l1-2", "l2-2", "l3-2").Inc() + baseVec.WithLabelValues("l1-2", "l2-3", "l3-3").Inc() + + baseVec.WithLabelValues("l1-3", "l2-3", "l3-3").Inc() + baseVec.WithLabelValues("l1-3", "l2-3", "").Inc() + baseVec.WithLabelValues("l1-3", "l2-4", "l3-4").Inc() + + baseVec.WithLabelValues("l1-4", "l2-5", "l3-5").Inc() + baseVec.WithLabelValues("l1-4", "l2-5", "l3-6").Inc() + baseVec.WithLabelValues("l1-5", "l2-6", "l3-6").Inc() + + getMetricsCount := func() int { + chs := make(chan prometheus.Metric, 10) + baseVec.Collect(chs) + return len(chs) + } + + // the prefix is matched which has one labels + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-2"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 7, getMetricsCount()) + + // the prefix is matched which has two labels + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-3"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 5, getMetricsCount()) + + // the first and latest labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-1", "l3": "l3-1"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 4, getMetricsCount()) + + // the middle labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-5"}), 2; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 2, getMetricsCount()) + + // the middle labels and suffix labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-6", "l3": "l3-6"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 1, getMetricsCount()) + + // all labels are matched + if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-4", "l3": "l3-4"}), 1; got != want { + t.Errorf("got %v, want %v", got, want) + } + assert.Equal(t, 0, getMetricsCount()) +} diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index dd72209b66..de1f25f551 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -40,7 +40,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "search_vectors_count", Help: "counter of vectors successfully searched", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) // ProxyInsertVectors record the number of vectors insert successfully. ProxyInsertVectors = prometheus.NewCounterVec( @@ -49,7 +49,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "insert_vectors_count", Help: "counter of vectors successfully inserted", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) // ProxyUpsertVectors record the number of vectors upsert successfully. ProxyUpsertVectors = prometheus.NewCounterVec( @@ -58,7 +58,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "upsert_vectors_count", Help: "counter of vectors successfully upserted", - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, databaseLabelName, collectionName}) ProxyDeleteVectors = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -76,9 +76,10 @@ var ( Name: "sq_latency", Help: "latency of search or query successfully", Buckets: buckets, - }, []string{nodeIDLabelName, queryTypeLabelName}) + }, []string{nodeIDLabelName, queryTypeLabelName, databaseLabelName, collectionName}) // ProxyCollectionSQLatency record the latency of search successfully, per collection + // Deprecated, ProxySQLatency instead of it ProxyCollectionSQLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -96,9 +97,10 @@ var ( Name: "mutation_latency", Help: "latency of insert or delete successfully", Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, msgTypeLabelName}) + }, []string{nodeIDLabelName, msgTypeLabelName, databaseLabelName, collectionName}) - // ProxyMutationLatency record the latency that mutate successfully, per collection + // ProxyCollectionMutationLatency record the latency that mutate successfully, per collection + // Deprecated, ProxyMutationLatency instead of it ProxyCollectionMutationLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -107,6 +109,7 @@ var ( Help: "latency of insert or delete successfully, per collection", Buckets: buckets, }, []string{nodeIDLabelName, msgTypeLabelName, collectionName}) + // ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result. ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -230,7 +233,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "req_count", Help: "count of operation executed", - }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) + }, []string{nodeIDLabelName, functionLabelName, statusLabelName, databaseLabelName, collectionName}) // ProxyReqLatency records the latency that for all requests, like "CreateCollection". ProxyReqLatency = prometheus.NewHistogramVec( @@ -372,7 +375,59 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySlowQueryCount) } -func CleanupCollectionMetrics(nodeID int64, collection string) { +func CleanupProxyDBMetrics(nodeID int64, dbName string) { + ProxySearchVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxySQLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) + ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + databaseLabelName: dbName, + }) +} + +func CleanupProxyCollectionMetrics(nodeID int64, collection string) { + ProxySearchVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxySQLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + collectionName: collection, + }) + ProxyCollectionSQLatency.Delete(prometheus.Labels{ nodeIDLabelName: strconv.FormatInt(nodeID, 10), queryTypeLabelName: SearchLabel, collectionName: collection, diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index d9a981e261..0de2b462a2 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -362,6 +362,7 @@ var ( Name: "entity_num", Help: "number of entities which can be searched/queried, clustered by collection, partition and state", }, []string{ + databaseLabelName, nodeIDLabelName, collectionIDLabelName, partitionIDLabelName, @@ -567,5 +568,11 @@ func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { msgTypeLabelName: label, collectionIDLabelName: fmt.Sprint(collectionID), }) + QueryNodeNumEntities. + DeletePartialMatch( + prometheus.Labels{ + nodeIDLabelName: fmt.Sprint(nodeID), + collectionIDLabelName: fmt.Sprint(collectionID), + }) } } diff --git a/pkg/metrics/rootcoord_metrics.go b/pkg/metrics/rootcoord_metrics.go index 3b7fdd0868..ddd3044078 100644 --- a/pkg/metrics/rootcoord_metrics.go +++ b/pkg/metrics/rootcoord_metrics.go @@ -93,13 +93,13 @@ var ( }) // RootCoordNumOfCollections counts the number of collections. - RootCoordNumOfCollections = prometheus.NewGauge( + RootCoordNumOfCollections = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, Subsystem: typeutil.RootCoordRole, Name: "collection_num", Help: "number of collections", - }) + }, []string{databaseLabelName}) // RootCoordNumOfPartitions counts the number of partitions per collection. RootCoordNumOfPartitions = prometheus.NewGaugeVec( @@ -247,3 +247,9 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumEntities) registry.MustRegister(RootCoordIndexedNumEntities) } + +func CleanupRootCoordDBMetrics(dbName string) { + RootCoordNumOfCollections.Delete(prometheus.Labels{ + databaseLabelName: dbName, + }) +}