enhance: Load BF from storage instead of memory during L0 compaction (#32913)

To decouple compaction from shard, loading BF from storage instead of
memory during L0 compaction in datanode.

issue: https://github.com/milvus-io/milvus/issues/32809

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-05-17 17:25:36 +08:00 committed by GitHub
parent 0d0eda24f8
commit bcaacf6fe6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 304 additions and 83 deletions

View File

@ -254,13 +254,9 @@ func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *s
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))
// get pkfield id
pkField := int64(-1)
for _, field := range schema.Fields {
if field.IsPrimaryKey {
pkField = field.FieldID
break
}
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return nil, err
}
// filter stats binlog files which is pk field stats log
@ -268,7 +264,7 @@ func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *s
logType := storage.DefaultStatsType
for _, binlog := range statsBinlogs {
if binlog.FieldID != pkField {
if binlog.FieldID != pkField.GetFieldID() {
continue
}
Loop:

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"
"github.com/samber/lo"
@ -38,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -54,6 +56,7 @@ type levelZeroCompactionTask struct {
allocator allocator.Allocator
metacache metacache.MetaCache
syncmgr syncmgr.SyncManager
cm storage.ChunkManager
plan *datapb.CompactionPlan
@ -70,6 +73,7 @@ func newLevelZeroCompactionTask(
alloc allocator.Allocator,
metaCache metacache.MetaCache,
syncmgr syncmgr.SyncManager,
cm storage.ChunkManager,
plan *datapb.CompactionPlan,
) *levelZeroCompactionTask {
ctx, cancel := context.WithCancel(ctx)
@ -81,6 +85,7 @@ func newLevelZeroCompactionTask(
allocator: alloc,
metacache: metaCache,
syncmgr: syncmgr,
cm: cm,
plan: plan,
tr: timerecord.NewTimeRecorder("levelzero compaction"),
done: make(chan struct{}, 1),
@ -129,13 +134,10 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return s.Level == datapb.SegmentLevel_L0
})
targetSegIDs := lo.FilterMap(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) {
if s.Level == datapb.SegmentLevel_L1 {
return s.GetSegmentID(), true
}
return 0, false
targetSegments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level != datapb.SegmentLevel_L0
})
if len(targetSegIDs) == 0 {
if len(targetSegments) == 0 {
log.Warn("compact wrong, not target sealed segments")
return nil, errIllegalCompactionPlan
}
@ -165,9 +167,9 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
var resultSegments []*datapb.CompactionSegment
if float64(hardware.GetFreeMemoryCount())*paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() < float64(totalSize) {
resultSegments, err = t.linearProcess(ctxTimeout, targetSegIDs, totalDeltalogs)
resultSegments, err = t.linearProcess(ctxTimeout, targetSegments, totalDeltalogs)
} else {
resultSegments, err = t.batchProcess(ctxTimeout, targetSegIDs, lo.Values(totalDeltalogs)...)
resultSegments, err = t.batchProcess(ctxTimeout, targetSegments, lo.Values(totalDeltalogs)...)
}
if err != nil {
return nil, err
@ -188,65 +190,87 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return result, nil
}
func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []int64, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
// just for logging
targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 {
return segment.GetSegmentID()
})
var (
resultSegments = make(map[int64]*datapb.CompactionSegment)
alteredSegments = make(map[int64]*storage.DeleteData)
)
segmentBFs, err := t.loadBF(targetSegments)
if err != nil {
return nil, err
}
for segID, deltaLogs := range totalDeltalogs {
log := log.With(zap.Int64("levelzero segment", segID))
log.Info("Linear L0 compaction start processing segment")
allIters, err := t.loadDelta(ctx, deltaLogs)
if err != nil {
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
t.splitDelta(ctx, allIters, alteredSegments, targetSegments)
t.splitDelta(ctx, allIters, alteredSegments, segmentBFs)
err = t.uploadByCheck(ctx, true, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
}
err := t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegmentIDs), zap.Error(err))
return nil, err
}
log.Info("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}
func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
// just for logging
targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 {
return segment.GetSegmentID()
})
log.Info("Batch L0 compaction start processing")
resultSegments := make(map[int64]*datapb.CompactionSegment)
iters, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
segmentBFs, err := t.loadBF(targetSegments)
if err != nil {
return nil, err
}
alteredSegments := make(map[int64]*storage.DeleteData)
t.splitDelta(ctx, iters, alteredSegments, targetSegments)
t.splitDelta(ctx, iters, alteredSegments, segmentBFs)
err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
log.Info("Batch L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
@ -271,18 +295,20 @@ func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allIters []*iter.DeltalogIterator,
targetSegBuffer map[int64]*storage.DeleteData,
targetSegIDs []int64,
segmentBfs map[int64]*metacache.BloomFilterSet,
) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()
// segments shall be safe to read outside
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...))
split := func(pk storage.PrimaryKey) []int64 {
lc := storage.NewLocationsCache(pk)
return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) {
return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(lc)
})
predicts := make([]int64, 0, len(segmentBfs))
for segmentID, bf := range segmentBfs {
if bf.PkExists(lc) {
predicts = append(predicts, segmentID)
}
}
return predicts
}
// spilt all delete data to segments
@ -395,3 +421,41 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec
return nil
}
func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
)
var (
futures = make([]*conc.Future[any], 0, len(targetSegments))
pool = getOrCreateStatsPool()
mu = &sync.Mutex{}
bfs = make(map[int64]*metacache.BloomFilterSet)
)
for _, segment := range targetSegments {
segment := segment
future := pool.Submit(func() (any, error) {
_ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(),
segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
pks, err := loadStats(t.ctx, t.cm,
t.metacache.Schema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return err, err
}
bf := metacache.NewBloomFilterSet(pks...)
mu.Lock()
defer mu.Unlock()
bfs[segment.GetSegmentID()] = bf
return nil, nil
})
futures = append(futures, future)
}
err := conc.AwaitAll(futures...)
return bfs, err
}

View File

@ -27,10 +27,12 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
@ -61,7 +63,7 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockMeta = metacache.NewMockMetaCache(s.T())
// plan of the task is unset
s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil)
s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil, nil)
pk2ts := map[int64]uint64{
1: 20000,
@ -105,7 +107,17 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() {
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Twice()
targetSegments := []int64{200}
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
})
deltaLogs := map[int64][]string{100: {"a/b/c1"}}
segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs)
@ -134,24 +146,43 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() {
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
{SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Twice()
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).RunAndReturn(
func(filters ...metacache.SegmentFilter) []*metacache.SegmentInfo {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1)
return []*metacache.SegmentInfo{segment1}
}).Twice()
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
targetSegments := []int64{200}
targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
})
deltaLogs := map[int64][]string{100: {"a/b/c1"}}
segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs)
@ -192,28 +223,49 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
{SegmentID: 201, Level: datapb.SegmentLevel_L1},
{SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
{SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1)
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2)
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2})
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
@ -230,11 +282,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
return s.Level == datapb.SegmentLevel_L0
})
targetSegIDs := lo.FilterMap(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) {
if s.Level == datapb.SegmentLevel_L1 {
return s.GetSegmentID(), true
}
return 0, false
targetSegments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
})
totalDeltalogs := make(map[UniqueID][]string)
@ -249,7 +298,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
segments, err := s.task.linearProcess(context.Background(), targetSegIDs, totalDeltalogs)
segments, err := s.task.linearProcess(context.Background(), targetSegments, totalDeltalogs)
s.NoError(err)
s.NotEmpty(segments)
s.Equal(2, len(segments))
@ -257,6 +306,9 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
lo.Map(segments, func(seg *datapb.CompactionSegment, _ int) int64 {
return seg.GetSegmentID()
}))
for _, segment := range segments {
s.NotNil(segment.GetDeltalogs())
}
log.Info("test segment results", zap.Any("result", segments))
}
@ -290,25 +342,35 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
},
},
},
{SegmentID: 200, Level: datapb.SegmentLevel_L1},
{SegmentID: 201, Level: datapb.SegmentLevel_L1},
{SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
{SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
},
}
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).RunAndReturn(
func(filters ...metacache.SegmentFilter) []*metacache.SegmentInfo {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2, 3}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1)
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2, 3}})
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2)
return []*metacache.SegmentInfo{segment1, segment2}
})
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once()
s.mockMeta.EXPECT().Collection().Return(1)
@ -316,6 +378,13 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
@ -328,11 +397,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
return s.Level == datapb.SegmentLevel_L0
})
targetSegIDs := lo.FilterMap(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) {
if s.Level == datapb.SegmentLevel_L1 {
return s.GetSegmentID(), true
}
return 0, false
targetSegments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L1
})
totalDeltalogs := make(map[UniqueID][]string)
@ -347,7 +413,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
segments, err := s.task.batchProcess(context.TODO(), targetSegIDs, lo.Values(totalDeltalogs)...)
segments, err := s.task.batchProcess(context.TODO(), targetSegments, lo.Values(totalDeltalogs)...)
s.NoError(err)
s.NotEmpty(segments)
s.Equal(2, len(segments))
@ -355,6 +421,9 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
lo.Map(segments, func(seg *datapb.CompactionSegment, _ int) int64 {
return seg.GetSegmentID()
}))
for _, segment := range segments {
s.NotNil(segment.GetDeltalogs())
}
log.Info("test segment results", zap.Any("result", segments))
}
@ -506,23 +575,23 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100}, bfs1)
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 101}, bfs2)
bfs3 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
segment3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 102}, bfs3)
predicted := []int64{100, 101, 102}
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2, segment3})
diter := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil)
s.Require().NotNil(diter)
targetSegBuffer := make(map[int64]*storage.DeleteData)
targetSegIDs := predicted
s.task.splitDelta(context.TODO(), []*iter.DeltalogIterator{diter}, targetSegBuffer, targetSegIDs)
segmentBFs := map[int64]*metacache.BloomFilterSet{
100: bfs1,
101: bfs2,
102: bfs3,
}
s.task.splitDelta(context.TODO(), []*iter.DeltalogIterator{diter}, targetSegBuffer, segmentBFs)
s.NotEmpty(targetSegBuffer)
s.ElementsMatch(predicted, lo.Keys(targetSegBuffer))
@ -601,3 +670,94 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
}
}
}
func (s *LevelZeroCompactionTaskSuite) TestLoadBF() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
},
}
s.task.plan = plan
data := &storage.Int64FieldData{
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
}
sw := &storage.StatsWriter{}
err := sw.GenerateByData(common.RowIDField, schemapb.DataType_Int64, data)
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: true,
},
},
})
bfs, err := s.task.loadBF(plan.SegmentBinlogs)
s.NoError(err)
s.Len(bfs, 1)
for _, pk := range s.dData.Pks {
lc := storage.NewLocationsCache(pk)
s.True(bfs[201].PkExists(lc))
}
}
func (s *LevelZeroCompactionTaskSuite) TestFailed() {
s.Run("no primary key", func() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogID: 9999, LogSize: 100},
},
},
}},
},
}
s.task.plan = plan
s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
IsPrimaryKey: false,
},
},
})
_, err := s.task.loadBF(plan.SegmentBinlogs)
s.Error(err)
})
s.Run("no l1 segments", func() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 201, Level: datapb.SegmentLevel_L0},
},
}
s.task.plan = plan
_, err := s.task.compact()
s.Error(err)
})
}

View File

@ -245,6 +245,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
node.allocator,
ds.metacache,
node.syncMgr,
node.chunkManager,
req,
)
case datapb.CompactionType_MixCompaction: