diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 08072c1dd9..93e204b2c3 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -18,7 +18,6 @@ package metrics import ( "net/http" - // nolint:gosec _ "net/http/pprof" @@ -587,11 +586,6 @@ func RegisterProxy() { prometheus.MustRegister(ProxyDmlChannelTimeTick) } -//RegisterQueryCoord registers QueryCoord metrics -func RegisterQueryCoord() { - -} - //RegisterQueryNode registers QueryNode metrics func RegisterQueryNode() { diff --git a/internal/metrics/querycoord.go b/internal/metrics/querycoord.go new file mode 100644 index 0000000000..7ac92e98ec --- /dev/null +++ b/internal/metrics/querycoord.go @@ -0,0 +1,143 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +const ( + // TODO: move to metrics.go + queryCoordStatusLabel = "status" + QueryCoordMetricLabelSuccess = "success" + QueryCoordMetricLabelFail = "fail" + QueryCoordMetricLabelTotal = "total" + + // TODO: move to metrics.go + collectionIDLabel = "collection_id" +) + +// queryCoordLoadBuckets involves durations in milliseconds, +// [10 20 40 80 160 320 640 1280 2560 5120 10240 20480 40960 81920 163840 327680 655360 1.31072e+06] +var queryCoordLoadBuckets = prometheus.ExponentialBuckets(10, 2, 18) + +var ( + QueryCoordNumCollections = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "num_collections", + Help: "Number of collections in QueryCoord.", + }, []string{}) + + QueryCoordNumEntities = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "num_entities", + Help: "Number of entities in collection.", + }, []string{ + collectionIDLabel, + }) + + QueryCoordLoadCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "load_count", + Help: "Load request statistic in QueryCoord.", + }, []string{ + queryCoordStatusLabel, + }) + + QueryCoordReleaseCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "release_count", + Help: "Release request statistic in QueryCoord.", + }, []string{ + queryCoordStatusLabel, + }) + + QueryCoordLoadLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "load_latency", + Help: "Load request latency in QueryCoord", + Buckets: queryCoordLoadBuckets, + }, []string{}) + + QueryCoordReleaseLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "release_latency", + Help: "Release request latency in QueryCoord", + Buckets: []float64{0, 5, 10, 20, 40, 100, 200, 400, 1000, 10000}, + }, []string{}) + + QueryCoordNumChildTasks = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "num_child_tasks", + Help: "Number of child tasks in QueryCoord.", + }, []string{}) + + QueryCoordNumParentTasks = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "num_parent_tasks", + Help: "Number of parent tasks in QueryCoord.", + }, []string{}) + + QueryCoordChildTaskLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "child_task_latency", + Help: "Child tasks latency in QueryCoord.", + Buckets: queryCoordLoadBuckets, + }, []string{}) + + QueryCoordNumQueryNodes = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryCoordRole, + Name: "num_querynodes", + Help: "Number of QueryNodes in QueryCoord.", + }, []string{}) +) + +//RegisterQueryCoord registers QueryCoord metrics +func RegisterQueryCoord() { + prometheus.MustRegister(QueryCoordNumCollections) + prometheus.MustRegister(QueryCoordNumEntities) + prometheus.MustRegister(QueryCoordLoadCount) + prometheus.MustRegister(QueryCoordReleaseCount) + prometheus.MustRegister(QueryCoordLoadLatency) + prometheus.MustRegister(QueryCoordReleaseLatency) + prometheus.MustRegister(QueryCoordNumChildTasks) + prometheus.MustRegister(QueryCoordNumParentTasks) + prometheus.MustRegister(QueryCoordChildTaskLatency) + prometheus.MustRegister(QueryCoordNumQueryNodes) +} diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 7f2d434111..44e4c214d6 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -29,6 +29,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" @@ -573,6 +574,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti go node.start() } c.nodes[id] = node + metrics.QueryCoordNumQueryNodes.WithLabelValues().Inc() log.Debug("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state)) return nil } @@ -605,6 +607,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error { } delete(c.nodes, nodeID) + metrics.QueryCoordNumQueryNodes.WithLabelValues().Dec() log.Debug("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID)) return nil diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index dcfd55d827..64b58b724d 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" @@ -158,6 +159,8 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl // LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc() + collectionID := req.CollectionID //schema := req.Schema log.Debug("loadCollectionRequest received", @@ -173,6 +176,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() log.Error("load collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -183,6 +188,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() return status, nil } // if some partitions of the collection have been loaded by load partitions request, return error @@ -198,6 +205,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } } @@ -219,6 +228,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -231,6 +242,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -244,6 +257,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle // ReleaseCollection clears all data related to this collecion on the querynode func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc() //dbID := req.DbID collectionID := req.CollectionID log.Debug("releaseCollectionRequest received", @@ -259,6 +273,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() log.Error("release collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -269,6 +285,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), zap.Int64("msgID", req.Base.MsgID)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() return status, nil } @@ -289,6 +307,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -301,6 +321,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -310,6 +332,9 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas zap.Int64("msgID", req.Base.MsgID)) //qc.MetaReplica.printMeta() //qc.cluster.printMeta() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() + metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(releaseCollectionTask.elapseSpan().Milliseconds())) return status, nil } @@ -404,6 +429,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti // LoadPartitions loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc() collectionID := req.CollectionID partitionIDs := req.PartitionIDs @@ -421,6 +447,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() log.Error("load partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -436,6 +464,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -475,6 +504,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -483,6 +514,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() return status, nil } @@ -504,6 +537,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -517,6 +552,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -531,6 +568,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti // ReleasePartitions clears all data related to this partition on the querynode func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc() + //dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs @@ -548,6 +587,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas err := errors.New("QueryCoord is not healthy") status.Reason = err.Error() log.Error("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -561,6 +602,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -579,6 +621,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID), zap.Error(err)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -599,6 +643,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", req.CollectionID), zap.Int64("msgID", req.Base.MsgID)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() return status, nil } @@ -608,6 +654,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", req.Base.MsgID)) + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() return status, nil } @@ -650,6 +698,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -663,6 +713,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas zap.Error(err)) status.ErrorCode = commonpb.ErrorCode_UnexpectedError status.Reason = err.Error() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc() return status, nil } @@ -674,6 +726,9 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas //qc.MetaReplica.printMeta() //qc.cluster.printMeta() + + metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() + metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(releaseTask.elapseSpan().Milliseconds())) return status, nil } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index e800260e6d..a64b87b211 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -165,6 +166,7 @@ func (m *MetaReplica) reloadFromKV() error { } m.collectionInfos[collectionID] = collectionInfo } + metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos))) if err := m.segmentsInfo.loadSegments(); err != nil { return err @@ -299,6 +301,7 @@ func (m *MetaReplica) addCollection(collectionID UniqueID, loadType querypb.Load } m.collectionMu.Lock() m.collectionInfos[collectionID] = newCollection + metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos))) m.collectionMu.Unlock() } @@ -362,6 +365,7 @@ func (m *MetaReplica) releaseCollection(collectionID UniqueID) error { m.collectionMu.Lock() delete(m.collectionInfos, collectionID) + metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos))) m.collectionMu.Unlock() m.deltaChannelMu.Lock() diff --git a/internal/querycoord/segments_info.go b/internal/querycoord/segments_info.go index 772617a40c..ead6702b7c 100644 --- a/internal/querycoord/segments_info.go +++ b/internal/querycoord/segments_info.go @@ -22,6 +22,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util" ) @@ -57,6 +58,7 @@ func (s *segmentsInfo) loadSegments() error { return } s.segmentIDMap[segment.GetSegmentID()] = segment + metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Add(float64(segment.NumRows)) } }) return err @@ -75,6 +77,7 @@ func (s *segmentsInfo) saveSegment(segment *querypb.SegmentInfo) error { return err } s.segmentIDMap[segment.GetSegmentID()] = segment + metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Add(float64(segment.NumRows)) return nil } @@ -86,6 +89,7 @@ func (s *segmentsInfo) removeSegment(segment *querypb.SegmentInfo) error { return err } delete(s.segmentIDMap, segment.GetSegmentID()) + metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Sub(float64(segment.NumRows)) return nil } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index abe9f08db3..96dcac85cc 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -27,10 +27,12 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/rootcoord" + "github.com/milvus-io/milvus/internal/util/timerecord" ) const timeoutForRPC = 10 * time.Second @@ -91,6 +93,7 @@ type task interface { setResultInfo(err error) getResultInfo() *commonpb.Status updateTaskProcess() + elapseSpan() time.Duration } type baseTask struct { @@ -111,6 +114,8 @@ type baseTask struct { parentTask task childTasks []task childTasksMu sync.RWMutex + + timeRecorder *timerecord.TimeRecorder } func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { @@ -125,6 +130,7 @@ func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *bas retryCount: MaxRetryNum, triggerCondition: triggerType, childTasks: []task{}, + timeRecorder: timerecord.NewTimeRecorder("QueryCoordBaseTask"), } return baseTask @@ -197,6 +203,7 @@ func (bt *baseTask) removeChildTaskByID(taskID UniqueID) { } } bt.childTasks = result + metrics.QueryCoordNumChildTasks.WithLabelValues().Dec() } func (bt *baseTask) clearChildTasks() { @@ -272,12 +279,17 @@ func (bt *baseTask) rollBack(ctx context.Context) []task { return nil } +func (bt *baseTask) elapseSpan() time.Duration { + return bt.timeRecorder.ElapseSpan() +} + type loadCollectionTask struct { *baseTask *querypb.LoadCollectionRequest broker *globalMetaBroker cluster Cluster meta Meta + once sync.Once } func (lct *loadCollectionTask) msgBase() *commonpb.MsgBase { @@ -331,6 +343,11 @@ func (lct *loadCollectionTask) updateTaskProcess() { log.Error("loadCollectionTask: set load percentage to meta's collectionInfo", zap.Int64("collectionID", collectionID)) lct.setResultInfo(err) } + lct.once.Do(func() { + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() + metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(lct.elapseSpan().Milliseconds())) + metrics.QueryCoordNumChildTasks.WithLabelValues().Sub(float64(len(lct.getChildTask()))) + }) } } @@ -430,6 +447,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { lct.addChildTask(internalTask) log.Debug("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID)) } + metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks))) log.Debug("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID)) err = lct.meta.addCollection(collectionID, querypb.LoadType_loadCollection, lct.Schema) @@ -624,6 +642,7 @@ type loadPartitionTask struct { cluster Cluster meta Meta addCol bool + once sync.Once } func (lpt *loadPartitionTask) msgBase() *commonpb.MsgBase { @@ -678,6 +697,11 @@ func (lpt *loadPartitionTask) updateTaskProcess() { lpt.setResultInfo(err) } } + lpt.once.Do(func() { + metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc() + metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(lpt.elapseSpan().Milliseconds())) + metrics.QueryCoordNumChildTasks.WithLabelValues().Sub(float64(len(lpt.getChildTask()))) + }) } } @@ -765,6 +789,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error { lpt.addChildTask(internalTask) log.Debug("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType()))) } + metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks))) log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID)) err = lpt.meta.addCollection(collectionID, querypb.LoadType_LoadPartition, lpt.Schema) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 44c8f61df0..a47dc9a835 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -31,6 +31,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/retry" @@ -70,6 +71,7 @@ func (queue *taskQueue) addTask(t task) { if queue.tasks.Len() == 0 { queue.taskChan <- 1 queue.tasks.PushBack(t) + metrics.QueryCoordNumParentTasks.WithLabelValues().Inc() return } @@ -87,6 +89,8 @@ func (queue *taskQueue) addTask(t task) { queue.tasks.InsertAfter(t, e) break } + + metrics.QueryCoordNumParentTasks.WithLabelValues().Inc() } func (queue *taskQueue) addTaskToFront(t task) { @@ -96,6 +100,8 @@ func (queue *taskQueue) addTaskToFront(t task) { } else { queue.tasks.PushFront(t) } + + metrics.QueryCoordNumParentTasks.WithLabelValues().Inc() } // PopTask pops a trigger task from task list @@ -111,6 +117,8 @@ func (queue *taskQueue) popTask() task { ft := queue.tasks.Front() queue.tasks.Remove(ft) + metrics.QueryCoordNumParentTasks.WithLabelValues().Dec() + return ft.Value.(task) } @@ -823,6 +831,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task, log.Debug("waitActivateTaskDone: one activate task done", zap.Int64("taskID", t.getTaskID()), zap.Int64("triggerTaskID", triggerTask.getTaskID())) + metrics.QueryCoordChildTaskLatency.WithLabelValues().Observe(float64(t.elapseSpan().Milliseconds())) } }