mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Reduce get segments scan during l0 compaction (#30408)
See also #27606 Previously l0 linear compaction will scan all target segment id from metacache for each line of delta entry, which is not needed since compaction target segments shall be all immutable. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
265453f400
commit
be8831b311
@ -267,6 +267,14 @@ func (t *levelZeroCompactionTask) splitDelta(
|
||||
targetSegBuffer map[int64]*storage.DeleteData,
|
||||
targetSegIDs []int64,
|
||||
) error {
|
||||
// segments shall be safe to read outside
|
||||
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...))
|
||||
split := func(pk storage.PrimaryKey) []int64 {
|
||||
return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) {
|
||||
return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(pk)
|
||||
})
|
||||
}
|
||||
|
||||
// spilt all delete data to segments
|
||||
for _, deltaIter := range allIters {
|
||||
for deltaIter.HasNext() {
|
||||
@ -275,10 +283,7 @@ func (t *levelZeroCompactionTask) splitDelta(
|
||||
return err
|
||||
}
|
||||
|
||||
predicted, found := t.metacache.PredictSegments(labeled.GetPk(), metacache.WithSegmentIDs(targetSegIDs...))
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
predicted := split(labeled.GetPk())
|
||||
|
||||
for _, gotSeg := range predicted {
|
||||
delBuffer, ok := targetSegBuffer[gotSeg]
|
||||
|
||||
@ -118,8 +118,15 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
|
||||
s.task.plan = plan
|
||||
s.task.tr = timerecord.NewTimeRecorder("test")
|
||||
|
||||
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
|
||||
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
|
||||
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1)
|
||||
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
|
||||
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
|
||||
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2)
|
||||
|
||||
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
|
||||
s.mockMeta.EXPECT().PredictSegments(mock.Anything, mock.Anything).Return([]int64{200, 201}, true)
|
||||
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2})
|
||||
s.mockMeta.EXPECT().Collection().Return(1)
|
||||
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
|
||||
@ -335,16 +342,18 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
|
||||
}
|
||||
|
||||
func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
|
||||
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
|
||||
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}})
|
||||
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100}, bfs1)
|
||||
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
|
||||
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
|
||||
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 101}, bfs2)
|
||||
bfs3 := metacache.NewBloomFilterSetWithBatchSize(100)
|
||||
bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
|
||||
segment3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 102}, bfs3)
|
||||
|
||||
predicted := []int64{100, 101, 102}
|
||||
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
|
||||
return pk.GetValue().(int64) == 1
|
||||
}), mock.Anything).Return([]int64{100}, true)
|
||||
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
|
||||
return pk.GetValue().(int64) == 2
|
||||
}), mock.Anything).Return(nil, false)
|
||||
s.mockMeta.EXPECT().PredictSegments(mock.MatchedBy(func(pk storage.PrimaryKey) bool {
|
||||
return pk.GetValue().(int64) == 3
|
||||
}), mock.Anything).Return([]int64{100, 101, 102}, true)
|
||||
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2, segment3})
|
||||
|
||||
diter, err := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user