mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
pr: #40394 - Use CounterVec to calculate sum of increase during a time period. - Use entries number instead of binlog size Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
d457ccc0e8
commit
f455923ac9
@ -179,24 +179,32 @@ func (e *executor) executeTask(task Compactor) {
|
||||
e.completed.Insert(result.GetPlanID(), result)
|
||||
e.completedCompactor.Insert(result.GetPlanID(), task)
|
||||
|
||||
getLogSize := func(binlogs []*datapb.FieldBinlog) int64 {
|
||||
size := int64(0)
|
||||
getDataCount := func(binlogs []*datapb.FieldBinlog) int64 {
|
||||
count := int64(0)
|
||||
for _, binlog := range binlogs {
|
||||
for _, fbinlog := range binlog.GetBinlogs() {
|
||||
size += fbinlog.GetLogSize()
|
||||
count += fbinlog.GetEntriesNum()
|
||||
}
|
||||
}
|
||||
return size
|
||||
return count
|
||||
}
|
||||
|
||||
totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 {
|
||||
return getLogSize(seg.GetInsertLogs()) +
|
||||
getLogSize(seg.GetField2StatslogPaths()) +
|
||||
getLogSize(seg.GetBm25Logs()) +
|
||||
getLogSize(seg.GetDeltalogs())
|
||||
var entityCount int64
|
||||
var deleteCount int64
|
||||
lo.ForEach(result.Segments, func(seg *datapb.CompactionSegment, _ int) {
|
||||
entityCount += seg.GetNumOfRows()
|
||||
deleteCount += getDataCount(seg.GetDeltalogs())
|
||||
})
|
||||
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize))
|
||||
|
||||
metrics.DataNodeWriteDataCount.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.CompactionDataSourceLabel,
|
||||
metrics.InsertLabel,
|
||||
fmt.Sprint(task.GetCollection())).Add(float64(entityCount))
|
||||
metrics.DataNodeWriteDataCount.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
metrics.CompactionDataSourceLabel,
|
||||
metrics.DeleteLabel,
|
||||
fmt.Sprint(task.GetCollection())).Add(float64(deleteCount))
|
||||
log.Info("end to execute compaction")
|
||||
}
|
||||
|
||||
|
||||
@ -178,10 +178,20 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
t.flushedSize = totalSize
|
||||
|
||||
getDataCount := func(binlogs ...*datapb.FieldBinlog) int64 {
|
||||
count := int64(0)
|
||||
for _, binlog := range binlogs {
|
||||
for _, fbinlog := range binlog.GetBinlogs() {
|
||||
count += fbinlog.GetEntriesNum()
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
metrics.DataNodeWriteDataCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, metrics.InsertLabel, fmt.Sprint(t.collectionID)).Add(float64(t.batchRows))
|
||||
metrics.DataNodeWriteDataCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, metrics.DeleteLabel, fmt.Sprint(t.collectionID)).Add(float64(getDataCount(t.deltaBinlog)))
|
||||
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize))
|
||||
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID)).Add(float64(t.flushedSize))
|
||||
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))
|
||||
|
||||
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))
|
||||
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))
|
||||
|
||||
if t.metaWriter != nil {
|
||||
|
||||
@ -58,15 +58,16 @@ var (
|
||||
segmentLevelLabelName,
|
||||
})
|
||||
|
||||
DataNodeWriteBinlogSize = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
DataNodeWriteDataCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "write_data_size",
|
||||
Name: "write_data_count",
|
||||
Help: "byte size of datanode write to object storage, including flushed size",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
dataSourceLabelName,
|
||||
dataTypeLabelName,
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
@ -274,7 +275,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
||||
registry.MustRegister(DataNodeFlushReqCounter)
|
||||
registry.MustRegister(DataNodeFlushedSize)
|
||||
registry.MustRegister(DataNodeFlushedRows)
|
||||
registry.MustRegister(DataNodeWriteBinlogSize)
|
||||
registry.MustRegister(DataNodeWriteDataCount)
|
||||
// compaction related
|
||||
registry.MustRegister(DataNodeCompactionLatency)
|
||||
registry.MustRegister(DataNodeCompactionLatencyInQueue)
|
||||
@ -317,7 +318,7 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
|
||||
DataNodeWriteBinlogSize.Delete(prometheus.Labels{
|
||||
DataNodeWriteDataCount.Delete(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
|
||||
@ -110,6 +110,7 @@ const (
|
||||
cacheNameLabelName = "cache_name"
|
||||
cacheStateLabelName = "cache_state"
|
||||
dataSourceLabelName = "data_source"
|
||||
dataTypeLabelName = "data_type"
|
||||
importStageLabelName = "import_stage"
|
||||
requestScope = "scope"
|
||||
fullMethodLabelName = "full_method"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user