From f455923ac9da026c19ba9c9731d91ca283bb37b7 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 17 Mar 2025 15:06:19 +0800 Subject: [PATCH] enhance: Use correct counter metrics for overall wa calculation (#40394) (#40679) pr: #40394 - Use CounterVec to calculate sum of increase during a time period. - Use entries number instead of binlog size Signed-off-by: yangxuan --- internal/datanode/compaction/executor.go | 30 +++++++++++++++--------- internal/flushcommon/syncmgr/task.go | 14 +++++++++-- pkg/metrics/datanode_metrics.go | 11 +++++---- pkg/metrics/metrics.go | 1 + 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index 372d5ea7db..2494f93eb2 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -179,24 +179,32 @@ func (e *executor) executeTask(task Compactor) { e.completed.Insert(result.GetPlanID(), result) e.completedCompactor.Insert(result.GetPlanID(), task) - getLogSize := func(binlogs []*datapb.FieldBinlog) int64 { - size := int64(0) + getDataCount := func(binlogs []*datapb.FieldBinlog) int64 { + count := int64(0) for _, binlog := range binlogs { for _, fbinlog := range binlog.GetBinlogs() { - size += fbinlog.GetLogSize() + count += fbinlog.GetEntriesNum() } } - return size + return count } - totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 { - return getLogSize(seg.GetInsertLogs()) + - getLogSize(seg.GetField2StatslogPaths()) + - getLogSize(seg.GetBm25Logs()) + - getLogSize(seg.GetDeltalogs()) + var entityCount int64 + var deleteCount int64 + lo.ForEach(result.Segments, func(seg *datapb.CompactionSegment, _ int) { + entityCount += seg.GetNumOfRows() + deleteCount += getDataCount(seg.GetDeltalogs()) }) - metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize)) - + metrics.DataNodeWriteDataCount.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + metrics.CompactionDataSourceLabel, + metrics.InsertLabel, + fmt.Sprint(task.GetCollection())).Add(float64(entityCount)) + metrics.DataNodeWriteDataCount.WithLabelValues( + fmt.Sprint(paramtable.GetNodeID()), + metrics.CompactionDataSourceLabel, + metrics.DeleteLabel, + fmt.Sprint(task.GetCollection())).Add(float64(deleteCount)) log.Info("end to execute compaction") } diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index cd4a95d8bd..895adce18a 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -178,10 +178,20 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } t.flushedSize = totalSize + getDataCount := func(binlogs ...*datapb.FieldBinlog) int64 { + count := int64(0) + for _, binlog := range binlogs { + for _, fbinlog := range binlog.GetBinlogs() { + count += fbinlog.GetEntriesNum() + } + } + return count + } + metrics.DataNodeWriteDataCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, metrics.InsertLabel, fmt.Sprint(t.collectionID)).Add(float64(t.batchRows)) + metrics.DataNodeWriteDataCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, metrics.DeleteLabel, fmt.Sprint(t.collectionID)).Add(float64(getDataCount(t.deltaBinlog))) metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize)) - metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID)).Add(float64(t.flushedSize)) - metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows)) + metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows)) metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds())) if t.metaWriter != nil { diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index bba7f8743f..d97b01c391 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -58,15 +58,16 @@ var ( segmentLevelLabelName, }) - DataNodeWriteBinlogSize = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + DataNodeWriteDataCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ Namespace: milvusNamespace, Subsystem: typeutil.DataNodeRole, - Name: "write_data_size", + Name: "write_data_count", Help: "byte size of datanode write to object storage, including flushed size", }, []string{ nodeIDLabelName, dataSourceLabelName, + dataTypeLabelName, collectionIDLabelName, }) @@ -274,7 +275,7 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeFlushReqCounter) registry.MustRegister(DataNodeFlushedSize) registry.MustRegister(DataNodeFlushedRows) - registry.MustRegister(DataNodeWriteBinlogSize) + registry.MustRegister(DataNodeWriteDataCount) // compaction related registry.MustRegister(DataNodeCompactionLatency) registry.MustRegister(DataNodeCompactionLatencyInQueue) @@ -317,7 +318,7 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel collectionIDLabelName: fmt.Sprint(collectionID), }) - DataNodeWriteBinlogSize.Delete(prometheus.Labels{ + DataNodeWriteDataCount.Delete(prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), }) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index a676745a0b..378affb4e2 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -110,6 +110,7 @@ const ( cacheNameLabelName = "cache_name" cacheStateLabelName = "cache_state" dataSourceLabelName = "data_source" + dataTypeLabelName = "data_type" importStageLabelName = "import_stage" requestScope = "scope" fullMethodLabelName = "full_method"