diff --git a/internal/proxy/msg_pack.go b/internal/proxy/msg_pack.go index 6f801c616f..543127095a 100644 --- a/internal/proxy/msg_pack.go +++ b/internal/proxy/msg_pack.go @@ -18,6 +18,8 @@ package proxy import ( "context" + "strconv" + "time" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -28,8 +30,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -119,7 +123,9 @@ func repackInsertDataByPartition(ctx context.Context, if err != nil { return nil, err } + beforeAssign := time.Now() assignedSegmentInfos, err := segIDAssigner.GetSegmentID(insertMsg.CollectionID, partitionID, channelName, uint32(len(rowOffsets)), maxTs) + metrics.ProxyAssignSegmentIDLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(time.Since(beforeAssign).Milliseconds())) if err != nil { log.Error("allocate segmentID for insert data failed", zap.String("collectionName", insertMsg.CollectionName), diff --git a/internal/proxy/segment.go b/internal/proxy/segment.go index d118d44837..cc2458f6c9 100644 --- a/internal/proxy/segment.go +++ b/internal/proxy/segment.go @@ -20,6 +20,7 @@ import ( "container/list" "context" "fmt" + "strconv" "time" "github.com/cockroachdb/errors" @@ -29,6 +30,8 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -309,10 +312,12 @@ func (sa *segIDAssigner) syncSegments() (bool, error) { PeerRole: typeutil.ProxyRole, SegmentIDRequests: sa.segReqs, } - + metrics.ProxySyncSegmentRequestLength.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(len(sa.segReqs))) sa.segReqs = nil log.Debug("syncSegments call dataCoord.AssignSegmentID", zap.String("request", req.String())) + resp, err := sa.dataCoord.AssignSegmentID(context.Background(), req) if err != nil { diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 6222a77298..b7a204fb69 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -138,6 +138,26 @@ var ( Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, msgTypeLabelName}) + // ProxyAssignSegmentIDLatency record the latency that Proxy get segmentID from dataCoord. + ProxyAssignSegmentIDLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "assign_segmentID_latency", + Help: "latency that proxy get segmentID from dataCoord", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName}) + + // ProxySyncSegmentRequestLength the length of SegmentIDRequests when assigning segments for insert. + ProxySyncSegmentRequestLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "sync_segment_request_length", + Help: "the length of SegmentIDRequests when assigning segments for insert", + Buckets: buckets, + }, []string{nodeIDLabelName}) + // ProxyCacheStatsCounter record the number of Proxy cache hits or miss. ProxyCacheStatsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -289,6 +309,9 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySendMutationReqLatency) + registry.MustRegister(ProxyAssignSegmentIDLatency) + registry.MustRegister(ProxySyncSegmentRequestLength) + registry.MustRegister(ProxyCacheStatsCounter) registry.MustRegister(ProxyUpdateCacheLatency)