diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index d60b3337cb..5215fd6cae 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -40,6 +40,8 @@ import ( "time" "unsafe" + "github.com/milvus-io/milvus/internal/metrics" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -280,6 +282,7 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde sp, ctx2 := trace.StartSpanFromContextWithOperationName(i.loopCtx, "IndexNode-CreateIndex") defer sp.Finish() sp.SetTag("IndexBuildID", strconv.FormatInt(request.IndexBuildID, 10)) + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), metrics.TotalLabel).Inc() t := &IndexBuildTask{ BaseTask: BaseTask{ @@ -302,10 +305,12 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde log.Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", request.IndexBuildID), zap.Error(err)) ret.ErrorCode = commonpb.ErrorCode_UnexpectedError ret.Reason = err.Error() + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), metrics.FailLabel).Inc() return ret, nil } log.Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", request.IndexBuildID)) + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), metrics.SuccessLabel).Inc() return ret, nil } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index a16d61bb49..916e9dbbb7 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -25,6 +25,8 @@ import ( "runtime/debug" "strconv" + "github.com/milvus-io/milvus/internal/metrics" + "go.uber.org/zap" "github.com/golang/protobuf/proto" @@ -367,7 +369,7 @@ func (it *IndexBuildTask) loadVector(ctx context.Context) (storage.FieldID, stor // In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned. return storage.InvalidUniqueID, nil, err } - + loadVectorDuration := it.tr.RecordSpan() log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID)) it.tr.Record("load vector data done") @@ -376,6 +378,11 @@ func (it *IndexBuildTask) loadVector(ctx context.Context) (storage.FieldID, stor if err2 != nil { return storage.InvalidUniqueID, nil, err2 } + + // TODO: @xiaocai2333 metrics.IndexNodeLoadBinlogLatency should be added above, put here to get segmentID. + metrics.IndexNodeLoadBinlogLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), strconv.FormatInt(it.segmentID, 10)).Observe(float64(loadVectorDuration)) + metrics.IndexNodeDecodeBinlogLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), strconv.FormatInt(it.segmentID, 10)).Observe(float64(it.tr.RecordSpan())) + if len(insertData.Data) != 1 { return storage.InvalidUniqueID, nil, errors.New("we expect only one field in deserialized insert data") } @@ -431,6 +438,8 @@ func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, erro } } + metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), strconv.FormatInt(it.segmentID, 10)).Observe(float64(it.tr.RecordSpan())) + if !fOk && !bOk { return nil, errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData") } @@ -469,6 +478,7 @@ func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, erro return nil, err } it.tr.Record("index codec serialize done") + metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), strconv.FormatInt(it.segmentID, 10)).Observe(float64(it.tr.RecordSpan())) return serializedIndexBlobs, nil } @@ -565,6 +575,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { } }() + it.tr.Record("new CIndex") var blobs []*storage.Blob blobs, err = it.buildIndex(ctx) if err != nil { @@ -580,6 +591,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { it.SetState(TaskStateRetry) return err } + metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(Params.IndexNodeCfg.NodeID, 10), strconv.FormatInt(it.segmentID, 10)).Observe(float64(it.tr.RecordSpan())) it.tr.Record("index file save done") it.tr.Elapse("index building all done") log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", it.collectionID), diff --git a/internal/metrics/indexcoord_metrics.go b/internal/metrics/indexcoord_metrics.go index 567ccf776e..a852b297fa 100644 --- a/internal/metrics/indexcoord_metrics.go +++ b/internal/metrics/indexcoord_metrics.go @@ -21,6 +21,14 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const ( + UnissuedIndexTaskLabel = "unissued" + InProgressIndexTaskLabel = "in-progress" + FinishedIndexTaskLabel = "finished" + FailedIndexTaskLabel = "failed" + RecycledIndexTaskLabel = "recycled" +) + var ( // IndexCoordIndexRequestCounter records the number of the index requests. IndexCoordIndexRequestCounter = prometheus.NewCounterVec( diff --git a/internal/metrics/indexnode_metrics.go b/internal/metrics/indexnode_metrics.go new file mode 100644 index 0000000000..0a5bb9a1ec --- /dev/null +++ b/internal/metrics/indexnode_metrics.go @@ -0,0 +1,88 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + SuccessLabel = "success" + FailLabel = "fail" + TotalLabel = "total" +) + +var ( + IndexNodeBuildIndexTaskCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "index_task_counter", + Help: "The number of tasks that index node received", + }, []string{"node_id", "status"}) + + IndexNodeLoadBinlogLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "load_segment_latency", + Help: "The latency of loading the segment", + }, []string{"node_id", "segment_id"}) + + IndexNodeDecodeBinlogLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "decode_binlog_latency", + Help: "The latency of decode the binlog", + }, []string{"node_id", "segment_id"}) + + IndexNodeKnowhereBuildIndexLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "knowhere_build_index_latency", + Help: "The latency of knowhere building the index", + }, []string{"node_id", "segment_id"}) + + IndexNodeEncodeIndexFileLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "encode_index_file_latency", + Help: "The latency of encoding the index file", + }, []string{"node_id", "segment_id"}) + + IndexNodeSaveIndexFileLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "save_index_file_latency", + Help: "The latency of saving the index file", + }, []string{"node_id", "segment_id"}) +) + +//RegisterIndexNode registers IndexNode metrics +func RegisterIndexNode() { + prometheus.MustRegister(IndexNodeBuildIndexTaskCounter) + prometheus.MustRegister(IndexNodeLoadBinlogLatency) + prometheus.MustRegister(IndexNodeDecodeBinlogLatency) + prometheus.MustRegister(IndexNodeKnowhereBuildIndexLatency) + prometheus.MustRegister(IndexNodeEncodeIndexFileLatency) + prometheus.MustRegister(IndexNodeSaveIndexFileLatency) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 5008a8a0b8..a74291a64a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "net/http" + // nolint:gosec _ "net/http/pprof" @@ -32,9 +33,6 @@ import ( const ( milvusNamespace = "milvus" - SuccessLabel = "success" - FailLabel = "fail" - TotalLabel = "total" AbandonLabel = "abandon" SearchLabel = "search" @@ -42,12 +40,6 @@ const ( CacheHitLabel = "hit" CacheMissLabel = "miss" - - UnissuedIndexTaskLabel = "unissued" - InProgressIndexTaskLabel = "in-progress" - FinishedIndexTaskLabel = "finished" - FailedIndexTaskLabel = "failed" - RecycledIndexTaskLabel = "recycled" ) var ( @@ -274,11 +266,6 @@ var ( }, []string{"type"}) ) -//RegisterIndexNode registers IndexNode metrics -func RegisterIndexNode() { - -} - //ServeHTTP serves prometheus http service func ServeHTTP() { http.Handle("/metrics", promhttp.Handler())