enhance: Add log with segment size for tasks (#46118)

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-12-05 16:45:11 +08:00 committed by GitHub
parent d8c9d15c07
commit 141547d8a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 4 deletions

View File

@ -56,7 +56,10 @@ func (t *mixCompactionTask) GetTaskSlot() int64 {
if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction { if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction {
segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0]) segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0])
if segment != nil { if segment != nil {
slotUsage = calculateStatsTaskSlot(segment.getSegmentSize()) segSize := segment.getSegmentSize()
slotUsage = calculateStatsTaskSlot(segSize)
log.Info("mixCompactionTask get task slot",
zap.Int64("segment size", segSize), zap.Int64("task slot", slotUsage))
} }
} }
t.slotUsage.Store(slotUsage) t.slotUsage.Store(slotUsage)

View File

@ -179,7 +179,8 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID) indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams) indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex) segSize := segment.getSegmentSize()
taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex)
// rewrite the index type if needed, and this final index type will be persisted in the meta // rewrite the index type if needed, and this final index type will be persisted in the meta
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() { if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
@ -215,6 +216,11 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
i.handler, i.handler,
i.storageCli, i.storageCli,
i.indexEngineVersionManager)) i.indexEngineVersionManager))
log.Info("indexInspector create index for segment success",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", indexID),
zap.Int64("segment size", segSize),
zap.Int64("task slot", taskSlot))
return nil return nil
} }

View File

@ -1759,7 +1759,9 @@ func (m *meta) completeMixCompactionMutation(
zap.String("type", t.GetType().String()), zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID), zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID), zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel())) zap.String("channel", t.GetChannel()),
zap.Int64("planID", t.GetPlanID()),
)
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
var compactFromSegIDs []int64 var compactFromSegIDs []int64
@ -1789,6 +1791,12 @@ func (m *meta) completeMixCompactionMutation(
// metrics mutation for compaction from segments // metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
log.Info("compact from segment",
zap.Int64("segmentID", cloned.GetID()),
zap.Int64("segment size", cloned.getSegmentSize()),
zap.Int64("num rows", cloned.GetNumOfRows()),
)
} }
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs)) log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
@ -1838,6 +1846,7 @@ func (m *meta) completeMixCompactionMutation(
zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())), zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())),
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())), zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())), zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
zap.Int64("segment size", compactToSegmentInfo.getSegmentSize()),
) )
compactToSegments = append(compactToSegments, compactToSegmentInfo) compactToSegments = append(compactToSegments, compactToSegmentInfo)
} }
@ -2332,7 +2341,9 @@ func (m *meta) completeSortCompactionMutation(
log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID())) log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID()))
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", segment.GetNumOfRows())) log.Info("meta update: prepare for complete stats mutation - complete",
zap.Int64("num rows", segment.GetNumOfRows()),
zap.Int64("segment size", segment.getSegmentSize()))
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil { if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err)) log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, nil, err return nil, nil, err