mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Add unified GRPC latency metrics in inteceptor (#44089)
Related to #43966 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
d70607c342
commit
ba88cfa7a9
@ -17,13 +17,14 @@
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
"github.com/milvus-io/milvus/internal/util/hookutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// PreAllocateBinlogIDs pre-allocates binlog IDs based on the total number of binlogs from
|
||||
|
||||
@ -20,24 +20,32 @@ import (
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/requestutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
var retryableCode typeutil.Set[int32]
|
||||
var (
|
||||
retryableCode typeutil.Set[int32]
|
||||
fullMethodName2Tag *typeutil.ConcurrentMap[string, string]
|
||||
sf conc.Singleflight[string]
|
||||
)
|
||||
|
||||
func init() {
|
||||
retryableCode = typeutil.NewSet(
|
||||
merr.Code(merr.ErrServiceRateLimit),
|
||||
merr.Code(merr.ErrCollectionSchemaMismatch),
|
||||
)
|
||||
|
||||
fullMethodName2Tag = typeutil.NewConcurrentMap[string, string]()
|
||||
}
|
||||
|
||||
// UnaryRequestStatsInterceptor implements `grpc.UnaryServerInterceptor`
|
||||
@ -46,7 +54,7 @@ func init() {
|
||||
// when some retirable error occurs, it will record it as `RetryLabel` instead of failure one
|
||||
// when other interceptor rejects the request, it will record it as `RejectedLabel`
|
||||
func UnaryRequestStatsInterceptor(ctx context.Context, req any, rpcInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
methodTag := ParseShortMethodName(rpcInfo.FullMethod)
|
||||
methodTag := FullMethodName2Tag(rpcInfo.FullMethod)
|
||||
db, _ := requestutil.GetDbNameFromRequest(req)
|
||||
collection, _ := requestutil.GetCollectionNameFromRequest(req)
|
||||
|
||||
@ -61,19 +69,47 @@ func UnaryRequestStatsInterceptor(ctx context.Context, req any, rpcInfo *grpc.Un
|
||||
collectionName,
|
||||
).Inc()
|
||||
|
||||
start := time.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
label := ParseMetricLabel(resp, err)
|
||||
|
||||
// set metrics for state code
|
||||
metrics.ProxyFunctionCall.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
methodTag,
|
||||
ParseMetricLabel(resp, err),
|
||||
label,
|
||||
dbName,
|
||||
collectionName,
|
||||
).Inc()
|
||||
|
||||
// set metrics for latency
|
||||
metrics.ProxyGRPCLatency.WithLabelValues(
|
||||
strconv.FormatInt(paramtable.GetNodeID(), 10),
|
||||
methodTag,
|
||||
label,
|
||||
).Observe(float64(time.Since(start).Milliseconds()))
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// FullMethodName2Tag returns method tag for grpc full method name
|
||||
// it utilizes `fullMethodName2Tag` as cache result
|
||||
// if cache miss, it will call `ParseShortMethodName` to parse method tag
|
||||
// SingleFlight `sf` will make sure there is only one call.
|
||||
func FullMethodName2Tag(fullMethodName string) string {
|
||||
tag, ok := fullMethodName2Tag.Get(fullMethodName)
|
||||
if ok {
|
||||
return tag
|
||||
}
|
||||
|
||||
tag, _, _ = sf.Do(fullMethodName, func() (string, error) {
|
||||
tag = ParseShortMethodName(fullMethodName)
|
||||
fullMethodName2Tag.Insert(fullMethodName, tag)
|
||||
return tag, nil
|
||||
})
|
||||
return tag
|
||||
}
|
||||
|
||||
// ParseShortMethodName parse short method name from full method name
|
||||
// input like: "/milvus.proto.milvus.MilvusService/Search"
|
||||
// returns "Search"
|
||||
|
||||
@ -235,6 +235,16 @@ var (
|
||||
Help: "count of operation executed",
|
||||
}, []string{nodeIDLabelName, functionLabelName, statusLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyReqLatency records the latency for each grpc request.
|
||||
ProxyGRPCLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "grpc_latency",
|
||||
Help: "latency of each grpc request",
|
||||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
|
||||
|
||||
// ProxyReqLatency records the latency that for all requests, like "CreateCollection".
|
||||
ProxyReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
@ -488,6 +498,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||
registry.MustRegister(ProxyApplyTimestampLatency)
|
||||
|
||||
registry.MustRegister(ProxyFunctionCall)
|
||||
registry.MustRegister(ProxyGRPCLatency)
|
||||
registry.MustRegister(ProxyReqLatency)
|
||||
|
||||
registry.MustRegister(ProxyReceiveBytes)
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -32,7 +33,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metric"
|
||||
"github.com/milvus-io/milvus/tests/integration"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type ArrayStructDataNodeSuite struct {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user