From be8831b31136d41ebdb8b4d011619afebc8ef909 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 1 Feb 2024 10:59:03 +0800 Subject: [PATCH] enhance: Reduce get segments scan during l0 compaction (#30408) See also #27606 Previously l0 linear compaction will scan all target segment id from metacache for each line of delta entry, which is not needed since compaction target segments shall be all immutable. Signed-off-by: Congqi Xia --- internal/datanode/l0_compactor.go | 13 ++++++++---- internal/datanode/l0_compactor_test.go | 29 +++++++++++++++++--------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index f717a41dfa..33134aa543 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -267,6 +267,14 @@ func (t *levelZeroCompactionTask) splitDelta( targetSegBuffer map[int64]*storage.DeleteData, targetSegIDs []int64, ) error { + // segments shall be safe to read outside + segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...)) + split := func(pk storage.PrimaryKey) []int64 { + return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) { + return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(pk) + }) + } + // spilt all delete data to segments for _, deltaIter := range allIters { for deltaIter.HasNext() { @@ -275,10 +283,7 @@ func (t *levelZeroCompactionTask) splitDelta( return err } - predicted, found := t.metacache.PredictSegments(labeled.GetPk(), metacache.WithSegmentIDs(targetSegIDs...)) - if !found { - continue - } + predicted := split(labeled.GetPk()) for _, gotSeg := range predicted { delBuffer, ok := targetSegBuffer[gotSeg] diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index 2dc36d6b4a..716db4090a 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -118,8 +118,15 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { s.task.plan = plan s.task.tr = timerecord.NewTimeRecorder("test") + bfs1 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}}) + segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1) + bfs2 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}}) + segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) - s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true) + s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2}) s.mockMeta.EXPECT().Collection().Return(1) s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { @@ -335,16 +342,18 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { } func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { + bfs1 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}}) + segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100}, bfs1) + bfs2 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}}) + segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 101}, bfs2) + bfs3 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}}) + segment3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 102}, bfs3) + predicted := []int64{100, 101, 102} - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 1 - }), mock.Anything).Return([]int64{100}, true) - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 2 - }), mock.Anything).Return(nil, false) - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 3 - }), mock.Anything).Return([]int64{100, 101, 102}, true) + s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2, segment3}) diter, err := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil) s.Require().NoError(err)