diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index f717a41dfa..33134aa543 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -267,6 +267,14 @@ func (t *levelZeroCompactionTask) splitDelta( targetSegBuffer map[int64]*storage.DeleteData, targetSegIDs []int64, ) error { + // segments shall be safe to read outside + segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...)) + split := func(pk storage.PrimaryKey) []int64 { + return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) { + return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(pk) + }) + } + // spilt all delete data to segments for _, deltaIter := range allIters { for deltaIter.HasNext() { @@ -275,10 +283,7 @@ func (t *levelZeroCompactionTask) splitDelta( return err } - predicted, found := t.metacache.PredictSegments(labeled.GetPk(), metacache.WithSegmentIDs(targetSegIDs...)) - if !found { - continue - } + predicted := split(labeled.GetPk()) for _, gotSeg := range predicted { delBuffer, ok := targetSegBuffer[gotSeg] diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index 2dc36d6b4a..716db4090a 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -118,8 +118,15 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { s.task.plan = plan s.task.tr = timerecord.NewTimeRecorder("test") + bfs1 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}}) + segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1) + bfs2 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}}) + segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) - s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true) + s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2}) s.mockMeta.EXPECT().Collection().Return(1) s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { @@ -335,16 +342,18 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { } func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { + bfs1 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}}) + segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100}, bfs1) + bfs2 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}}) + segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 101}, bfs2) + bfs3 := metacache.NewBloomFilterSetWithBatchSize(100) + bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}}) + segment3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 102}, bfs3) + predicted := []int64{100, 101, 102} - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 1 - }), mock.Anything).Return([]int64{100}, true) - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 2 - }), mock.Anything).Return(nil, false) - s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool { - return pk.GetValue().(int64) == 3 - }), mock.Anything).Return([]int64{100, 101, 102}, true) + s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2, segment3}) diter, err := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil) s.Require().NoError(err)