enhance: Enable to observe write amplification (#39661)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-02-08 18:38:43 +08:00 committed by GitHub
parent a3df2782d6
commit 1f14053c70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 0 deletions

View File

@ -18,6 +18,7 @@ package compaction
import (
"context"
"fmt"
"sync"
"github.com/samber/lo"
@ -25,6 +26,7 @@ import (
"golang.org/x/sync/semaphore"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -177,6 +179,24 @@ func (e *executor) executeTask(task Compactor) {
e.completed.Insert(result.GetPlanID(), result)
e.completedCompactor.Insert(result.GetPlanID(), task)
getLogSize := func(binlogs []*datapb.FieldBinlog) int64 {
size := int64(0)
for _, binlog := range binlogs {
for _, fbinlog := range binlog.GetBinlogs() {
size += fbinlog.GetLogSize()
}
}
return size
}
totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 {
return getLogSize(seg.GetInsertLogs()) +
getLogSize(seg.GetField2StatslogPaths()) +
getLogSize(seg.GetBm25Logs()) +
getLogSize(seg.GetDeltalogs())
})
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize))
log.Info("end to execute compaction")
}

View File

@ -130,6 +130,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
}
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize))
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID)).Add(float64(t.flushedSize))
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))

View File

@ -58,6 +58,18 @@ var (
segmentLevelLabelName,
})
DataNodeWriteBinlogSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "write_data_size",
Help: "byte size of datanode write to object storage, including flushed size",
}, []string{
nodeIDLabelName,
dataSourceLabelName,
collectionIDLabelName,
})
DataNodeFlushedRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
@ -249,6 +261,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeFlushReqCounter)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeFlushedRows)
registry.MustRegister(DataNodeWriteBinlogSize)
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
@ -290,4 +303,8 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
DataNodeWriteBinlogSize.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
}

View File

@ -59,6 +59,7 @@ const (
StreamingDataSourceLabel = "streaming"
BulkinsertDataSourceLabel = "bulkinsert"
CompactionDataSourceLabel = "compaction"
Leader = "OnLeader"
FromLeader = "FromLeader"