From ba88cfa7a9b66580bb424226a2ffcb363f37243d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 28 Aug 2025 09:53:51 +0800 Subject: [PATCH] enhance: Add unified GRPC latency metrics in inteceptor (#44089) Related to #43966 Signed-off-by: Congqi Xia --- internal/datacoord/compaction_util.go | 3 +- .../distributed/proxy/request_interceptor.go | 42 +++++++++++++++++-- pkg/metrics/proxy_metrics.go | 11 +++++ .../integration/datanode/struct_array_test.go | 2 +- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/internal/datacoord/compaction_util.go b/internal/datacoord/compaction_util.go index 2d689518b8..823a3f8378 100644 --- a/internal/datacoord/compaction_util.go +++ b/internal/datacoord/compaction_util.go @@ -17,13 +17,14 @@ package datacoord import ( + "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "google.golang.org/protobuf/proto" ) // PreAllocateBinlogIDs pre-allocates binlog IDs based on the total number of binlogs from diff --git a/internal/distributed/proxy/request_interceptor.go b/internal/distributed/proxy/request_interceptor.go index 5387ddc587..5cc01f2604 100644 --- a/internal/distributed/proxy/request_interceptor.go +++ b/internal/distributed/proxy/request_interceptor.go @@ -20,24 +20,32 @@ import ( "context" "strconv" "strings" + "time" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/requestutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -var retryableCode typeutil.Set[int32] +var ( + retryableCode typeutil.Set[int32] + fullMethodName2Tag *typeutil.ConcurrentMap[string, string] + sf conc.Singleflight[string] +) func init() { retryableCode = typeutil.NewSet( merr.Code(merr.ErrServiceRateLimit), merr.Code(merr.ErrCollectionSchemaMismatch), ) + + fullMethodName2Tag = typeutil.NewConcurrentMap[string, string]() } // UnaryRequestStatsInterceptor implements `grpc.UnaryServerInterceptor` @@ -46,7 +54,7 @@ func init() { // when some retirable error occurs, it will record it as `RetryLabel` instead of failure one // when other interceptor rejects the request, it will record it as `RejectedLabel` func UnaryRequestStatsInterceptor(ctx context.Context, req any, rpcInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - methodTag := ParseShortMethodName(rpcInfo.FullMethod) + methodTag := FullMethodName2Tag(rpcInfo.FullMethod) db, _ := requestutil.GetDbNameFromRequest(req) collection, _ := requestutil.GetCollectionNameFromRequest(req) @@ -61,19 +69,47 @@ func UnaryRequestStatsInterceptor(ctx context.Context, req any, rpcInfo *grpc.Un collectionName, ).Inc() + start := time.Now() resp, err := handler(ctx, req) + label := ParseMetricLabel(resp, err) + // set metrics for state code metrics.ProxyFunctionCall.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), methodTag, - ParseMetricLabel(resp, err), + label, dbName, collectionName, ).Inc() + // set metrics for latency + metrics.ProxyGRPCLatency.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + methodTag, + label, + ).Observe(float64(time.Since(start).Milliseconds())) + return resp, err } +// FullMethodName2Tag returns method tag for grpc full method name +// it utilizes `fullMethodName2Tag` as cache result +// if cache miss, it will call `ParseShortMethodName` to parse method tag +// SingleFlight `sf` will make sure there is only one call. +func FullMethodName2Tag(fullMethodName string) string { + tag, ok := fullMethodName2Tag.Get(fullMethodName) + if ok { + return tag + } + + tag, _, _ = sf.Do(fullMethodName, func() (string, error) { + tag = ParseShortMethodName(fullMethodName) + fullMethodName2Tag.Insert(fullMethodName, tag) + return tag, nil + }) + return tag +} + // ParseShortMethodName parse short method name from full method name // input like: "/milvus.proto.milvus.MilvusService/Search" // returns "Search" diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 2048e4901f..09da0fdcb0 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -235,6 +235,16 @@ var ( Help: "count of operation executed", }, []string{nodeIDLabelName, functionLabelName, statusLabelName, databaseLabelName, collectionName}) + // ProxyReqLatency records the latency for each grpc request. + ProxyGRPCLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "grpc_latency", + Help: "latency of each grpc request", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) + // ProxyReqLatency records the latency that for all requests, like "CreateCollection". ProxyReqLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -488,6 +498,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyApplyTimestampLatency) registry.MustRegister(ProxyFunctionCall) + registry.MustRegister(ProxyGRPCLatency) registry.MustRegister(ProxyReqLatency) registry.MustRegister(ProxyReceiveBytes) diff --git a/tests/integration/datanode/struct_array_test.go b/tests/integration/datanode/struct_array_test.go index 25323d61c6..0df492656c 100644 --- a/tests/integration/datanode/struct_array_test.go +++ b/tests/integration/datanode/struct_array_test.go @@ -22,6 +22,7 @@ import ( "strconv" "testing" + "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -32,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metric" "github.com/milvus-io/milvus/tests/integration" - "github.com/stretchr/testify/suite" ) type ArrayStructDataNodeSuite struct {