diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 0cd2d44628..cdf87002ff 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -63,11 +63,11 @@ func (fNode *filterNode) Operate(in Msg) Msg { } metrics.QueryNodeConsumerMsgCount. - WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel, fmt.Sprint(fNode.collectionID)). + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, fmt.Sprint(fNode.collectionID)). Inc() metrics.QueryNodeConsumeTimeTickLag. - WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel, fmt.Sprint(fNode.collectionID)). + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TimetickLabel, fmt.Sprint(fNode.collectionID)). Set(float64(tsoutil.SubByNow(streamMsgPack.EndTs))) // Get collection from collection manager @@ -127,7 +127,7 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { case commonpb.MsgType_Delete: deleteMsg := msg.(*msgstream.DeleteMsg) - metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(deleteMsg.Size())) + metrics.QueryNodeConsumeCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(deleteMsg.Size())) for _, policy := range fNode.DeleteMsgPolicys { err := policy(fNode, c, deleteMsg) if err != nil {