From 281260e48a69bc4e0ba1d802e46eb1a1e2f12cbf Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 20 Mar 2025 11:28:11 +0800 Subject: [PATCH] fix: Massive memory cost when compacting (#40763) downloads batch binlogs instead of all segment's binlogs See also: #40761 --------- Signed-off-by: yangxuan --- .../datanode/compaction/compactor_common.go | 8 ++-- internal/datanode/compaction/mix_compactor.go | 38 ++++++++++--------- .../datanode/compaction/mix_compactor_test.go | 4 +- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index bcea13bda5..28b8bb88ce 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -169,15 +169,15 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in } func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( - deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error, + deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][][]string, err error, ) { if err := binlog.DecompressCompactionBinlogs(segments); err != nil { log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) return nil, nil, err } - deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths - insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths + deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths + insertPaths = make(map[typeutil.UniqueID][][]string, 0) // segmentID to binlog paths for _, s := range segments { segId := s.GetSegmentID() // Get the batch count of field binlog files from non-empty segment @@ -199,7 +199,7 @@ func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( for _, f := range s.GetFieldBinlogs() { batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath()) } - insertPaths[segId] = append(insertPaths[segId], batchPaths...) + insertPaths[segId] = append(insertPaths[segId], batchPaths) } deltaPaths[s.GetSegmentID()] = []string{} diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 2bf4ec509f..a340173017 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -131,7 +131,7 @@ func (t *mixCompactionTask) preCompact() error { func (t *mixCompactionTask) mergeSplit( ctx context.Context, - insertPaths map[int64][]string, + insertPaths map[int64][][]string, deltaPaths map[int64][]string, ) ([]*datapb.CompactionSegment, error) { _ = t.tr.RecordSpan() @@ -154,14 +154,23 @@ func (t *mixCompactionTask) mergeSplit( log.Warn("failed to get pk field from schema") return nil, err } - for segId, binlogPaths := range insertPaths { + for segId, binlogBatches := range insertPaths { deltaPaths := deltaPaths[segId] - del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField) + delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths) if err != nil { + log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) return nil, err } - deletedRowCount += del - expiredRowCount += exp + entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) + + for _, batchPaths := range binlogBatches { + del, exp, err := t.writeSegment(ctx, batchPaths, entityFilter, mWriter, pkField) + if err != nil { + return nil, err + } + deletedRowCount += del + expiredRowCount += exp + } } res, err := mWriter.Finish() if err != nil { @@ -180,28 +189,21 @@ func (t *mixCompactionTask) mergeSplit( } func (t *mixCompactionTask) writeSegment(ctx context.Context, - binlogPaths []string, - deltaPaths []string, + batchPaths []string, + entityFilter *EntityFilter, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, ) (deletedRowCount, expiredRowCount int64, err error) { - log := log.With(zap.Strings("paths", binlogPaths)) - allValues, err := t.binlogIO.Download(ctx, binlogPaths) + log := log.With(zap.Strings("paths", batchPaths)) + allValues, err := t.binlogIO.Download(ctx, batchPaths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) return } blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { - return &storage.Blob{Key: binlogPaths[i], Value: v} + return &storage.Blob{Key: batchPaths[i], Value: v} }) - delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths) - if err != nil { - log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) - return - } - entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) - reader, err := storage.NewCompositeBinlogRecordReader(blobs) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) @@ -272,7 +274,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } } - deltalogDeleteEntriesCount := len(delta) + deltalogDeleteEntriesCount := entityFilter.GetDeltalogDeleteCount() deletedRowCount = int64(entityFilter.GetDeletedCount()) expiredRowCount = int64(entityFilter.GetExpiredCount()) diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 1e3091fc69..7fe78fd0d6 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -357,7 +357,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.task.partitionID = PartitionID s.task.maxRows = 1000 - compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: lo.Keys(kvs)}, nil) + compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][][]string{s.segWriter.segmentID: {lo.Keys(kvs)}}, nil) s.NoError(err) s.Equal(1, len(compactionSegments)) s.EqualValues(0, compactionSegments[0].GetNumOfRows()) @@ -410,7 +410,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { s.task.collectionID = CollectionID s.task.partitionID = PartitionID s.task.maxRows = 1000 - res, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: insertPaths}, deletePaths) + res, err := s.task.mergeSplit(s.task.ctx, map[int64][][]string{s.segWriter.segmentID: {insertPaths}}, deletePaths) s.NoError(err) s.EqualValues(test.expectedRes, len(res)) s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())