From b457c2f41585dbeacb2df61a09d0a05867802a56 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 7 Jan 2025 11:20:56 +0800 Subject: [PATCH] enhance: [2.5]Add missing delete metrics (#38634) (#38747) Add 2 counter metrics: - Total delete entries from deltalog: milvus_datanode_compaction_delete_count - Total missing deletes: milvus_datanode_compaction_missing_delete_count See also: #34665 pr: #38634 Signed-off-by: yangxuan --- .../compaction/clustering_compactor.go | 53 +++------- .../datanode/compaction/compactor_common.go | 99 ++++++++++++++++--- .../compaction/compactor_common_test.go | 84 ++++++++++++++++ internal/datanode/compaction/merge_sort.go | 62 +++++++----- internal/datanode/compaction/mix_compactor.go | 49 ++++----- .../datanode/compaction/mix_compactor_test.go | 65 +++--------- internal/proto/data_coord.proto | 2 +- pkg/metrics/datanode_metrics.go | 26 +++++ 8 files changed, 278 insertions(+), 162 deletions(-) create mode 100644 internal/datanode/compaction/compactor_common_test.go diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index cd2620692a..e7a618191b 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -56,7 +56,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -93,7 +92,7 @@ type clusteringCompactionTask struct { // inner field collectionID int64 partitionID int64 - currentTs typeutil.Timestamp // for TTL + currentTime time.Time // for TTL isVectorClusteringKey bool clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema @@ -223,7 +222,7 @@ func (t *clusteringCompactionTask) init() error { t.primaryKeyField = pkField t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType) - t.currentTs = tsoutil.GetCurrentTime() + t.currentTime = time.Now() t.memoryBufferSize = t.getMemoryBufferSize() workerPoolSize := t.getWorkerPoolSize() t.mappingPool = conc.NewPool[any](workerPoolSize) @@ -563,11 +562,7 @@ func (t *clusteringCompactionTask) mappingSegment( log.Info("mapping segment start") processStart := time.Now() fieldBinlogPaths := make([][]string, 0) - var ( - expired int64 = 0 - deleted int64 = 0 - remained int64 = 0 - ) + var remained int64 = 0 deltaPaths := make([]string, 0) for _, d := range segment.GetDeltalogs() { @@ -579,17 +574,7 @@ func (t *clusteringCompactionTask) mappingSegment( if err != nil { return err } - - isDeletedValue := func(v *storage.Value) bool { - ts, ok := delta[v.PK.GetValue()] - // insert task and delete task has the same ts when upsert - // here should be < instead of <= - // to avoid the upsert data to be deleted after compact - if ok && uint64(v.Timestamp) < ts { - return true - } - return false - } + entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{} if t.isVectorClusteringKey { @@ -656,15 +641,7 @@ func (t *clusteringCompactionTask) mappingSegment( v := pkIter.Value() offset++ - // Filtering deleted entity - if isDeletedValue(v) { - deleted++ - continue - } - // Filtering expired entity - ts := typeutil.Timestamp(v.Timestamp) - if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) { - expired++ + if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { continue } @@ -753,13 +730,19 @@ func (t *clusteringCompactionTask) mappingSegment( } } } + missing := entityFilter.GetMissingDeleteCount() log.Info("mapping segment end", zap.Int64("remained_entities", remained), - zap.Int64("deleted_entities", deleted), - zap.Int64("expired_entities", expired), + zap.Int("deleted_entities", entityFilter.GetDeletedCount()), + zap.Int("expired_entities", entityFilter.GetExpiredCount()), + zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()), + zap.Int("missing deletes", missing), zap.Int64("written_row_num", t.writtenRowNum.Load()), zap.Duration("elapse", time.Since(processStart))) + + metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetDeltalogDeleteCount())) + metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(missing)) return nil } @@ -1175,8 +1158,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( var ( timestampTo int64 = -1 timestampFrom int64 = -1 - expired int64 = 0 - deleted int64 = 0 remained int64 = 0 analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0) ) @@ -1203,6 +1184,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( fieldBinlogPaths = append(fieldBinlogPaths, ps) } + expiredFilter := newEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) for _, paths := range fieldBinlogPaths { allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { @@ -1233,9 +1215,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( v := pkIter.Value() // Filtering expired entity - ts := typeutil.Timestamp(v.Timestamp) - if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) { - expired++ + if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { continue } @@ -1264,8 +1244,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( log.Info("analyze segment end", zap.Int64("remained entities", remained), - zap.Int64("deleted entities", deleted), - zap.Int64("expired entities", expired), + zap.Int("expired entities", expiredFilter.GetExpiredCount()), zap.Duration("map elapse", time.Since(processStart))) return analyzeResult, nil } diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 8fd26ebf68..2ff9c15b4e 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -37,25 +37,94 @@ import ( const compactionBatchSize = 100 -func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool { - // entity expire is not enabled if duration <= 0 - if ttl <= 0 { - return false +type EntityFilter struct { + deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts + ttl int64 // nanoseconds + currentTime time.Time + + expiredCount int + deletedCount int +} + +func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter { + if deletedPkTs == nil { + deletedPkTs = make(map[interface{}]typeutil.Timestamp) + } + return &EntityFilter{ + deletedPkTs: deletedPkTs, + ttl: ttl, + currentTime: currTime, + } +} + +func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool { + if filter.isEntityDeleted(pk, ts) { + filter.deletedCount++ + return true } - pts, _ := tsoutil.ParseTS(ts) - pnow, _ := tsoutil.ParseTS(now) - expireTime := pts.Add(time.Duration(ttl)) - return expireTime.Before(pnow) + // Filtering expired entity + if filter.isEntityExpired(ts) { + filter.expiredCount++ + return true + } + return false +} + +func (filter *EntityFilter) GetExpiredCount() int { + return filter.expiredCount +} + +func (filter *EntityFilter) GetDeletedCount() int { + return filter.deletedCount +} + +func (filter *EntityFilter) GetDeltalogDeleteCount() int { + return len(filter.deletedPkTs) +} + +func (filter *EntityFilter) GetMissingDeleteCount() int { + diff := filter.GetDeltalogDeleteCount() - filter.GetDeletedCount() + if diff <= 0 { + diff = 0 + } + return diff +} + +func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool { + if deleteTs, ok := filter.deletedPkTs[pk]; ok { + // insert task and delete task has the same ts when upsert + // here should be < instead of <= + // to avoid the upsert data to be deleted after compact + if pkTs < deleteTs { + return true + } + } + return false +} + +func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool { + // entity expire is not enabled if duration <= 0 + if filter.ttl <= 0 { + return false + } + entityTime, _ := tsoutil.ParseTS(entityTs) + + // this dur can represents 292 million years before or after 1970, enough for milvus + // ttl calculation + dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli() + + // filter.ttl is nanoseconds + return filter.ttl/int64(time.Millisecond) <= dur } func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) { - pk2ts := make(map[interface{}]typeutil.Timestamp) + pk2Ts := make(map[interface{}]typeutil.Timestamp) log := log.Ctx(ctx) if len(paths) == 0 { log.Debug("compact with no deltalogs, skip merge deltalogs") - return pk2ts, nil + return pk2Ts, nil } blobs := make([]*storage.Blob, 0) @@ -88,17 +157,15 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in } dl := reader.Value() - // If pk already exists in pk2ts, record the later one. - if ts, ok := pk2ts[dl.Pk.GetValue()]; ok && ts > dl.Ts { + if ts, ok := pk2Ts[dl.Pk.GetValue()]; ok && ts > dl.Ts { continue } - pk2ts[dl.Pk.GetValue()] = dl.Ts + pk2Ts[dl.Pk.GetValue()] = dl.Ts } - log.Info("compact mergeDeltalogs end", - zap.Int("deleted pk counts", len(pk2ts))) + log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts))) - return pk2ts, nil + return pk2Ts, nil } func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( diff --git a/internal/datanode/compaction/compactor_common_test.go b/internal/datanode/compaction/compactor_common_test.go new file mode 100644 index 0000000000..122bdc8307 --- /dev/null +++ b/internal/datanode/compaction/compactor_common_test.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestCompactorCommonTaskSuite(t *testing.T) { + suite.Run(t, new(CompactorCommonSuite)) +} + +type CompactorCommonSuite struct { + suite.Suite +} + +func (s *CompactorCommonSuite) TestEntityFilterByTTL() { + milvusBirthday := getMilvusBirthday() + + tests := []struct { + description string + collTTL int64 + nowTime time.Time + entityTime time.Time + + expect bool + }{ + // ttl == maxInt64, dur is 1hour, no entities should expire + {"ttl=maxInt64, nowentity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(-time.Hour), false}, + {"ttl=maxInt64, now==entity", math.MaxInt64, milvusBirthday, milvusBirthday, false}, + // ttl == 0, no entities should expire + {"ttl=0, now==entity", 0, milvusBirthday, milvusBirthday, false}, + {"ttl=0, now>entity", 0, milvusBirthday, milvusBirthday.Add(-time.Hour), false}, + {"ttl=0, now10days", 864000000000000, milvusBirthday.AddDate(0, 0, 11), milvusBirthday, true}, + {"ttl=10days, nowTs-entityTs==10days", 864000000000000, milvusBirthday.AddDate(0, 0, 10), milvusBirthday, true}, + {"ttl=10days, nowTs-entityTs<10days", 864000000000000, milvusBirthday.AddDate(0, 0, 9), milvusBirthday, false}, + // ttl is maxInt64 + {"ttl=maxInt64, nowTs-entityTs>1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 11), milvusBirthday, true}, + {"ttl=maxInt64, nowTs-entityTs==1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true}, + {"ttl=maxInt64, nowTs-entityTs==240year", math.MaxInt64, milvusBirthday.AddDate(240, 0, 0), milvusBirthday, false}, + {"ttl=maxInt64, nowTs-entityTs==maxDur", math.MaxInt64, milvusBirthday.Add(time.Duration(math.MaxInt64)), milvusBirthday, true}, + {"ttl 0 { @@ -160,12 +152,30 @@ func mergeSortMultipleSegments(ctx context.Context, seg.IsSorted = true } + var ( + deletedRowCount int + expiredRowCount int + missingDeleteCount int + deltalogDeleteEntriesCount int + ) + + for _, filter := range segmentFilters { + deletedRowCount += filter.GetDeletedCount() + expiredRowCount += filter.GetExpiredCount() + missingDeleteCount += filter.GetMissingDeleteCount() + deltalogDeleteEntriesCount += filter.GetDeltalogDeleteCount() + } + totalElapse := tr.RecordSpan() log.Info("compact mergeSortMultipleSegments end", zap.Int64s("mergeSplit to segments", lo.Keys(mWriter.cachedMeta)), - zap.Int64("deleted row count", deletedRowCount), - zap.Int64("expired entities", expiredRowCount), + zap.Int("deleted row count", deletedRowCount), + zap.Int("expired entities", expiredRowCount), + zap.Int("missing deletes", missingDeleteCount), zap.Duration("total elapse", totalElapse)) + metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(collectionID)).Add(float64(deltalogDeleteEntriesCount)) + metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(collectionID)).Add(float64(missingDeleteCount)) + return res, nil } diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 271cf1ca54..f771c40a2b 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -40,13 +40,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) type mixCompactionTask struct { - binlogIO io.BinlogIO - currentTs typeutil.Timestamp + binlogIO io.BinlogIO + currentTime time.Time plan *datapb.CompactionPlan @@ -74,13 +73,13 @@ func NewMixCompactionTask( ) *mixCompactionTask { ctx1, cancel := context.WithCancel(ctx) return &mixCompactionTask{ - ctx: ctx1, - cancel: cancel, - binlogIO: binlogIO, - plan: plan, - tr: timerecord.NewTimeRecorder("mergeSplit compaction"), - currentTs: tsoutil.GetCurrentTime(), - done: make(chan struct{}, 1), + ctx: ctx1, + cancel: cancel, + binlogIO: binlogIO, + plan: plan, + tr: timerecord.NewTimeRecorder("mergeSplit compaction"), + currentTime: time.Now(), + done: make(chan struct{}, 1), } } @@ -201,23 +200,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) return } - - isValueDeleted := func(pk any, ts typeutil.Timestamp) bool { - oldts, ok := delta[pk] - // insert task and delete task has the same ts when upsert - // here should be < instead of <= - // to avoid the upsert data to be deleted after compact - if ok && ts < oldts { - deletedRowCount++ - return true - } - // Filtering expired entity - if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(ts)) { - expiredRowCount++ - return true - } - return false - } + entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) reader, err := storage.NewCompositeBinlogRecordReader(blobs) if err != nil { @@ -265,7 +248,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, panic("invalid data type") } ts := typeutil.Timestamp(tsArray.Value(i)) - if isValueDeleted(pk, ts) { + if entityFilter.Filtered(pk, ts) { if sliceStart != -1 { err = writeSlice(r, sliceStart, i) if err != nil { @@ -288,6 +271,14 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } } } + + deltalogDeleteEntriesCount := len(delta) + deletedRowCount = int64(entityFilter.GetDeletedCount()) + expiredRowCount = int64(entityFilter.GetExpiredCount()) + + metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(deltalogDeleteEntriesCount)) + metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetMissingDeleteCount())) + return } @@ -347,7 +338,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { if sortMergeAppicable { log.Info("compact by merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, - t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs) + t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl(), t.bm25FieldIDs) if err != nil { log.Warn("compact wrong, fail to merge sort segments", zap.Error(err)) return nil, err diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index d3df04c56e..e94b162952 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -340,8 +340,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.initSegBuffer(1, 3) collTTL := 864000 // 10 days - currTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second*(time.Duration(collTTL)+1)), 0) - s.task.currentTs = currTs + s.task.currentTime = getMilvusBirthday().Add(time.Second * (time.Duration(collTTL) + 1)) s.task.plan.CollectionTtl = int64(collTTL) alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) @@ -512,15 +511,14 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsMultiSegment() { got, err := mergeDeltalogs(s.task.ctx, s.task.binlogIO, []string{"random"}) s.NoError(err) + s.Equal(len(got), len(test.expectedpk2ts)) - s.Equal(len(test.expectedpk2ts), len(got)) - gotKeys := lo.Map(lo.Keys(got), func(k interface{}, _ int) int64 { - res, ok := k.(int64) + for gotKT, gotV := range got { + gotK, ok := gotKT.(int64) s.Require().True(ok) - return res - }) - s.ElementsMatch(gotKeys, lo.Keys(test.expectedpk2ts)) - s.ElementsMatch(lo.Values(got), lo.Values(test.expectedpk2ts)) + + s.EqualValues(test.expectedpk2ts[gotK], gotV) + } }) } } @@ -551,13 +549,12 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsOneSegment() { s.NotNil(got) s.Equal(len(expectedMap), len(got)) - gotKeys := lo.Map(lo.Keys(got), func(k interface{}, _ int) int64 { - res, ok := k.(int64) + for gotKT, gotV := range got { + gotK, ok := gotKT.(int64) s.Require().True(ok) - return res - }) - s.ElementsMatch(gotKeys, lo.Keys(expectedMap)) - s.ElementsMatch(lo.Values(got), lo.Values(expectedMap)) + + s.EqualValues(expectedMap[gotK], gotV) + } } func (s *MixCompactionTaskSuite) TestCompactFail() { @@ -586,44 +583,6 @@ func (s *MixCompactionTaskSuite) TestCompactFail() { }) } -func (s *MixCompactionTaskSuite) TestIsExpiredEntity() { - milvusBirthdayTs := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0) - - tests := []struct { - description string - collTTL int64 - nowTs uint64 - entityTs uint64 - - expect bool - }{ - {"ttl=maxInt64, nowTs-entityTs=ttl", math.MaxInt64, math.MaxInt64, 0, true}, - {"ttl=maxInt64, nowTs-entityTs < 0", math.MaxInt64, milvusBirthdayTs, 0, false}, - {"ttl=maxInt64, 0ttl v2", math.MaxInt64, math.MaxInt64, milvusBirthdayTs, true}, - // entityTs==currTs will never happen - // {"ttl=maxInt64, curTs-entityTs=0", math.MaxInt64, milvusBirthdayTs, milvusBirthdayTs, true}, - {"ttl=0, nowTs>entityTs", 0, milvusBirthdayTs + 1, milvusBirthdayTs, false}, - {"ttl=0, nowTs==entityTs", 0, milvusBirthdayTs, milvusBirthdayTs, false}, - {"ttl=0, nowTs10days", 864000, milvusBirthdayTs + 864001, milvusBirthdayTs, true}, - {"ttl=10days, nowTs-entityTs==10days", 864000, milvusBirthdayTs + 864000, milvusBirthdayTs, true}, - {"ttl=10days, nowTs-entityTs<10days", 864000, milvusBirthdayTs + 10, milvusBirthdayTs, false}, - } - for _, test := range tests { - s.Run(test.description, func() { - t := &mixCompactionTask{ - plan: &datapb.CompactionPlan{ - CollectionTtl: test.collTTL, - }, - currentTs: test.nowTs, - } - got := isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, test.entityTs) - s.Equal(test.expect, got) - }) - } -} - func getRow(magic int64) map[int64]interface{} { ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0) return map[int64]interface{}{ diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 2d56581eda..c7ca6ab8ce 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -604,7 +604,7 @@ message CompactionPlan { CompactionType type = 5; uint64 timetravel = 6; string channel = 7; - int64 collection_ttl = 8; + int64 collection_ttl = 8; // nanoseconds int64 total_rows = 9; schema.CollectionSchema schema = 10; int64 clustering_key_field = 11; diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index ff982a4bbf..173d9000ea 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -237,6 +237,22 @@ var ( nodeIDLabelName, channelNameLabelName, }) + + DataNodeCompactionDeleteCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "compaction_delete_count", + Help: "Number of delete entries in compaction", + }, []string{collectionIDLabelName}) + + DataNodeCompactionMissingDeleteCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "compaction_missing_delete_count", + Help: "Number of missing deletes in compaction", + }, []string{collectionIDLabelName}) ) // RegisterDataNode registers DataNode metrics @@ -261,6 +277,8 @@ func RegisterDataNode(registry *prometheus.Registry) { // compaction related registry.MustRegister(DataNodeCompactionLatency) registry.MustRegister(DataNodeCompactionLatencyInQueue) + registry.MustRegister(DataNodeCompactionDeleteCount) + registry.MustRegister(DataNodeCompactionMissingDeleteCount) // deprecated metrics registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken) registry.MustRegister(DataNodeNumProducers) @@ -298,4 +316,12 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel nodeIDLabelName: fmt.Sprint(nodeID), collectionIDLabelName: fmt.Sprint(collectionID), }) + + DataNodeCompactionDeleteCount.Delete(prometheus.Labels{ + collectionIDLabelName: fmt.Sprint(collectionID), + }) + + DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{ + collectionIDLabelName: fmt.Sprint(collectionID), + }) }