From b435c422c141fc8506f384a313a674f6033409ca Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 21 Feb 2022 17:15:51 +0800 Subject: [PATCH] Add prometheus metrics for IndexCoord (#15638) Signed-off-by: cai.zhang --- internal/indexcoord/index_coord.go | 14 ++++++ internal/indexcoord/meta_table.go | 5 ++ internal/indexcoord/node_manager.go | 4 ++ internal/metrics/indexcoord_metrics.go | 70 ++++++++++++++++++++++++++ internal/metrics/metrics.go | 5 -- 5 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 internal/metrics/indexcoord_metrics.go diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 25d81659af..f094679726 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -27,6 +27,8 @@ import ( "syscall" "time" + "github.com/milvus-io/milvus/internal/metrics" + "go.etcd.io/etcd/api/v3/mvccpb" "go.uber.org/zap" @@ -392,6 +394,7 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ }, }, err } + metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc() log.Debug("IndexCoord building index ...", zap.Int64("IndexBuildID", req.IndexBuildID), zap.String("IndexName = ", req.IndexName), @@ -445,6 +448,7 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ if err != nil { ret.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Status.Reason = err.Error() + metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return ret, nil } log.Debug("IndexCoord BuildIndex Enqueue successfully", zap.Int64("IndexBuildID", t.indexBuildID)) @@ -454,11 +458,13 @@ func (i *IndexCoord) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequ log.Error("IndexCoord scheduler index task failed", zap.Int64("IndexBuildID", t.indexBuildID)) ret.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Status.Reason = err.Error() + metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return ret, nil } sp.SetTag("IndexCoord-IndexBuildID", strconv.FormatInt(t.indexBuildID, 10)) ret.Status.ErrorCode = commonpb.ErrorCode_Success ret.IndexBuildID = t.indexBuildID + metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc() return ret, nil } @@ -736,6 +742,7 @@ func (i *IndexCoord) recycleUnusedIndexFiles() { log.Debug("IndexCoord recycleUnusedIndexFiles", zap.Int64("Recycle the low version index files successfully of the index with indexBuildID", meta.indexMeta.IndexBuildID)) } + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.RecycledIndexTaskLabel).Inc() } } } @@ -823,6 +830,13 @@ func (i *IndexCoord) watchMetaLoop() { zap.Int64("Finish by IndexNode", indexMeta.NodeID), zap.Int64("The version of the task", indexMeta.Version)) i.nodeManager.pq.IncPriority(indexMeta.NodeID, -1) + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.InProgressIndexTaskLabel).Dec() + if indexMeta.State == commonpb.IndexState_Finished { + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.FinishedIndexTaskLabel).Inc() + } + if indexMeta.State == commonpb.IndexState_Failed { + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.FailedIndexTaskLabel).Inc() + } } case mvccpb.DELETE: log.Debug("IndexCoord watchMetaLoop DELETE", zap.Int64("The meta has been deleted of indexBuildID", indexBuildID)) diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index adce147d4c..b717ca0403 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -23,6 +23,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/metrics" + "go.uber.org/zap" "github.com/golang/protobuf/proto" @@ -159,6 +161,7 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ }, revision: 0, } + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Inc() return mt.saveIndexMeta(meta) } @@ -185,6 +188,8 @@ func (mt *metaTable) BuildIndex(indexBuildID UniqueID, nodeID int64) error { } meta.indexMeta.NodeID = nodeID meta.indexMeta.State = commonpb.IndexState_InProgress + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.UnissuedIndexTaskLabel).Dec() + metrics.IndexCoordIndexTaskCounter.WithLabelValues(metrics.InProgressIndexTaskLabel).Inc() err := mt.saveIndexMeta(&meta) if err != nil { diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index a283c33115..3a7439a555 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/milvus-io/milvus/internal/metrics" + "go.uber.org/zap" grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" @@ -74,6 +76,7 @@ func (nm *NodeManager) RemoveNode(nodeID UniqueID) { delete(nm.nodeClients, nodeID) nm.lock.Unlock() nm.pq.Remove(nodeID) + metrics.IndexCoordIndexNodeNum.WithLabelValues("index_node_num").Dec() } // AddNode adds the client of IndexNode. @@ -94,6 +97,7 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { log.Error("IndexCoord NodeManager", zap.Any("Add node err", err)) return err } + metrics.IndexCoordIndexNodeNum.WithLabelValues("index_node_num").Inc() return nm.setClient(nodeID, nodeClient) } diff --git a/internal/metrics/indexcoord_metrics.go b/internal/metrics/indexcoord_metrics.go new file mode 100644 index 0000000000..71521b1da0 --- /dev/null +++ b/internal/metrics/indexcoord_metrics.go @@ -0,0 +1,70 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + SuccessLabel = "success" + FailLabel = "fail" + TotalLabel = "total" + + UnissuedIndexTaskLabel = "unissued" + InProgressIndexTaskLabel = "in-progress" + FinishedIndexTaskLabel = "finished" + FailedIndexTaskLabel = "failed" + RecycledIndexTaskLabel = "recycled" +) + +var ( + // IndexCoordIndexRequestCounter records the number of the index requests. + IndexCoordIndexRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexCoordRole, + Name: "index_req_counter", + Help: "The number of requests to build index", + }, []string{"status"}) + + // IndexCoordIndexTaskCounter records the number of index tasks of each type. + IndexCoordIndexTaskCounter = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexCoordRole, + Name: "index_task_counter", + Help: "The number of index tasks of each type", + }, []string{"type"}) + + // IndexCoordIndexNodeNum records the number of IndexNodes managed by IndexCoord. + IndexCoordIndexNodeNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexCoordRole, + Name: "index_node_num", + Help: "The number of IndexNodes managed by IndexCoord", + }, []string{"type"}) +) + +//RegisterIndexCoord registers IndexCoord metrics +func RegisterIndexCoord() { + prometheus.MustRegister(IndexCoordIndexRequestCounter) + prometheus.MustRegister(IndexCoordIndexTaskCounter) + prometheus.MustRegister(IndexCoordIndexNodeNum) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 28b0f42e5f..08072c1dd9 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -640,11 +640,6 @@ func RegisterDataNode() { prometheus.MustRegister(DataNodeWatchDmChannelsCounter) } -//RegisterIndexCoord registers IndexCoord metrics -func RegisterIndexCoord() { - -} - //RegisterIndexNode registers IndexNode metrics func RegisterIndexNode() {