mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: Add collection id to search request count metrics (#38069)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
fdea7a7898
commit
108434969f
@ -197,10 +197,10 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||||||
)
|
)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -256,12 +256,12 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||||||
|
|
||||||
latency := tr.ElapseSpan()
|
latency := tr.ElapseSpan()
|
||||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.QueryRequest, channel string, srv streamrpc.QueryStreamServer) error {
|
func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.QueryRequest, channel string, srv streamrpc.QueryStreamServer) error {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
msgID := req.Req.Base.GetMsgID()
|
msgID := req.Req.Base.GetMsgID()
|
||||||
log := log.Ctx(ctx).With(
|
log := log.Ctx(ctx).With(
|
||||||
zap.Int64("msgID", msgID),
|
zap.Int64("msgID", msgID),
|
||||||
@ -273,7 +273,7 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer
|
|||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -357,10 +357,10 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||||||
defer node.lifetime.Done()
|
defer node.lifetime.Done()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -409,7 +409,7 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
|
|||||||
// update metric to prometheus
|
// update metric to prometheus
|
||||||
latency := tr.ElapseSpan()
|
latency := tr.ElapseSpan()
|
||||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds()))
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetNq()))
|
metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetNq()))
|
||||||
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetTopk()))
|
metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(node.GetNodeID())).Observe(float64(req.Req.GetTopk()))
|
||||||
return resp, nil
|
return resp, nil
|
||||||
|
|||||||
@ -674,10 +674,10 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||||||
}
|
}
|
||||||
defer node.lifetime.Done()
|
defer node.lifetime.Done()
|
||||||
|
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
if !merr.Ok(resp.GetStatus()) {
|
if !merr.Ok(resp.GetStatus()) {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -725,7 +725,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||||||
|
|
||||||
latency := tr.ElapseSpan()
|
latency := tr.ElapseSpan()
|
||||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
|
|
||||||
resp = task.SearchResult()
|
resp = task.SearchResult()
|
||||||
resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
|
resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds()
|
||||||
@ -825,10 +825,10 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
|||||||
}
|
}
|
||||||
defer node.lifetime.Done()
|
defer node.lifetime.Done()
|
||||||
|
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -867,7 +867,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
|
|||||||
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
||||||
latency := tr.ElapseSpan()
|
latency := tr.ElapseSpan()
|
||||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
result := task.Result()
|
result := task.Result()
|
||||||
result.GetCostAggregation().ResponseTime = latency.Milliseconds()
|
result.GetCostAggregation().ResponseTime = latency.Milliseconds()
|
||||||
result.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
|
result.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ()
|
||||||
@ -1022,10 +1022,10 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp
|
|||||||
)
|
)
|
||||||
|
|
||||||
resp := &internalpb.RetrieveResults{}
|
resp := &internalpb.RetrieveResults{}
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
defer func() {
|
defer func() {
|
||||||
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1056,7 +1056,7 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp
|
|||||||
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
|
||||||
latency := tr.ElapseSpan()
|
latency := tr.ElapseSpan()
|
||||||
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
|
||||||
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader).Inc()
|
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.FromLeader, fmt.Sprint(req.GetReq().GetCollectionID())).Inc()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -162,6 +162,7 @@ var (
|
|||||||
queryTypeLabelName,
|
queryTypeLabelName,
|
||||||
statusLabelName,
|
statusLabelName,
|
||||||
requestScope,
|
requestScope,
|
||||||
|
collectionIDLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
QueryNodeSQReqLatency = prometheus.NewHistogramVec(
|
QueryNodeSQReqLatency = prometheus.NewHistogramVec(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user