fix: [2.5] Fix delete data loss due to duplicate binlogID (#40976)

With concurrenct L0 compaction
(https://github.com/milvus-io/milvus/pull/36816), delta logs might be
written to the same L1 segment, causing logID duplication when using the
incremental beginLogID. This PR removes the beginLogID mechanism and
instead passes a log ID range, where the number of IDs in the range
equals the number of compaction segment binlogs multiplied by an
expansion factor.

issue: https://github.com/milvus-io/milvus/issues/40207

pr: https://github.com/milvus-io/milvus/pull/40960

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-03-28 14:34:21 +08:00 committed by GitHub
parent d420f100c0
commit 27ea5d14dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1619 additions and 1427 deletions

View File

@ -190,11 +190,15 @@ func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int6
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
logIDRange, err := PreAllocateBinlogIDs(t.allocator, t.meta.GetSegmentInfos(taskProto.GetInputSegments()))
if err != nil {
return nil, err
}
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
@ -211,6 +215,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
AnalyzeSegmentIds: taskProto.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
PreAllocatedLogIDs: logIDRange,
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
@ -232,7 +237,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
IsSorted: segInfo.GetIsSorted(),
})
}
log.Info("Compaction handler build clustering compaction plan")
log.Info("Compaction handler build clustering compaction plan", zap.Any("PreAllocatedLogIDs", logIDRange))
return plan, nil
}

View File

@ -273,7 +273,7 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac
return taskClone
}
func (t *l0CompactionTask) selectSealedSegment() ([]int64, []*datapb.CompactionSegmentBinlogs) {
func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
@ -298,17 +298,13 @@ func (t *l0CompactionTask) selectSealedSegment() ([]int64, []*datapb.CompactionS
}
})
sealedSegmentIDs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) int64 {
return info.GetID()
})
return sealedSegmentIDs, sealedSegBinlogs
return sealedSegments, sealedSegBinlogs
}
func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
for _, sealedSegmentID := range sealedSegmentIDs {
if sealedSegmentID == segmentID {
for _, sealedSegment := range sealedSegmentIDs {
if sealedSegment.GetID() == segmentID {
return true
}
}
@ -316,16 +312,15 @@ func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool
}
func (t *l0CompactionTask) PreparePlan() bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
sealedSegments, _ := t.selectSealedSegment()
sealedSegmentIDs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) int64 {
return info.GetID()
})
exist, hasStating := t.meta.CheckSegmentsStating(context.TODO(), sealedSegmentIDs)
return exist && !hasStating
}
func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
@ -336,11 +331,11 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
BeginLogID: beginLogID,
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
segments := make([]*SegmentInfo, 0)
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(context.TODO(), segID)
if segInfo == nil {
@ -355,19 +350,33 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
Deltalogs: segInfo.GetDeltalogs(),
IsSorted: segInfo.GetIsSorted(),
})
segments = append(segments, segInfo)
}
sealedSegmentIDs, sealedSegBinlogs := t.selectSealedSegment()
if len(sealedSegmentIDs) == 0 {
sealedSegments, sealedSegBinlogs := t.selectSealedSegment()
if len(sealedSegments) == 0 {
// TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
}
segments = append(segments, sealedSegments...)
logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments)
if err != nil {
return nil, err
}
plan.PreAllocatedLogIDs = logIDRange
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
plan.BeginLogID = beginLogID
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", taskProto.GetPos()),
zap.Any("target segments count", len(sealedSegBinlogs)))
zap.Any("target segments count", len(sealedSegBinlogs)),
zap.Any("PreAllocatedLogIDs", logIDRange))
return plan, nil
}

View File

@ -129,9 +129,6 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
NodeID: 1,
State: datapb.CompactionTaskState_executing,
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
_, err := task.BuildCompactionRequest()
s.Error(err)
@ -162,9 +159,6 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{100, 101},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
_, err := task.BuildCompactionRequest()
s.Error(err)
}
@ -174,23 +168,31 @@ func (s *L0CompactionTaskSuite) TestBuildCompactionRequestFailed_AllocFailed() {
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, errors.New("mock alloc err"))
meta, err := newMemoryMeta(s.T())
s.NoError(err)
task = &l0CompactionTask{
allocator: s.mockAlloc,
meta: meta,
}
_, err := task.BuildCompactionRequest()
task.SetTask(&datapb.CompactionTask{})
_, err = task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)
task = &mixCompactionTask{
allocator: s.mockAlloc,
meta: meta,
}
task.SetTask(&datapb.CompactionTask{})
_, err = task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)
task = &clusteringCompactionTask{
allocator: s.mockAlloc,
meta: meta,
}
task.SetTask(&datapb.CompactionTask{})
_, err = task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)

View File

@ -333,10 +333,6 @@ func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
@ -347,13 +343,13 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: taskProto.GetMaxSize(),
}
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
segments := make([]*SegmentInfo, 0, len(taskProto.GetInputSegments()))
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(context.TODO(), segID)
if segInfo == nil {
@ -371,8 +367,22 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
IsSorted: segInfo.GetIsSorted(),
})
segIDMap[segID] = segInfo.GetDeltalogs()
segments = append(segments, segInfo)
}
log.Info("Compaction handler refreshed mix compaction plan", zap.Int64("maxSize", plan.GetMaxSize()), zap.Any("segID2DeltaLogs", segIDMap))
logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments)
if err != nil {
return nil, err
}
plan.PreAllocatedLogIDs = logIDRange
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
plan.BeginLogID = beginLogID
log.Info("Compaction handler refreshed mix compaction plan", zap.Int64("maxSize", plan.GetMaxSize()),
zap.Any("segID2DeltaLogs", segIDMap), zap.Any("PreAllocatedLogIDs", logIDRange))
return plan, nil
}

View File

@ -85,9 +85,6 @@ func (s *MixCompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(int64(1)).Return(19530, 99999, nil)
task.allocator = alloc
_, err := task.BuildCompactionRequest()
s.Error(err)
s.ErrorIs(err, merr.ErrSegmentNotFound)

View File

@ -385,7 +385,8 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
// TODO[GOOSE], 11 = 1 planID + 10 segmentID, this is a hack need to be removed.
// Any plan that output segment number greater than 10 will be marked as invalid plan for now.
startID, endID, err := t.allocator.AllocN(11)
n := 11 * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64()
startID, endID, err := t.allocator.AllocN(n)
if err != nil {
log.Warn("fail to allocate id", zap.Error(err))
return err

View File

@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -622,6 +623,7 @@ func Test_compactionTrigger_force(t *testing.T) {
TotalRows: 200,
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 100, End: 200},
MaxSize: 1342177280,
SlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
},
@ -657,7 +659,7 @@ func Test_compactionTrigger_force(t *testing.T) {
case plan := <-spy.spyChan:
plan.StartTime = 0
sortPlanCompactionBinlogs(plan)
assert.EqualValues(t, tt.wantPlans[0], plan)
assert.True(t, proto.Equal(tt.wantPlans[0], plan))
return
case <-time.After(3 * time.Second):
assert.Fail(t, "timeout")
@ -952,12 +954,13 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
IsSorted: true,
},
},
BeginLogID: 100,
StartTime: 3,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Channel: "ch1",
MaxSize: 1342177280,
BeginLogID: 100,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 200, End: 2000},
StartTime: 3,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Channel: "ch1",
MaxSize: 1342177280,
},
},
},

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/logutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type CompactionTriggerType int8
@ -315,7 +316,8 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
}
resultSegmentNum := (totalRows/preferSegmentRows + 1) * 2
start, end, err := m.allocator.AllocN(resultSegmentNum)
n := resultSegmentNum * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64()
start, end, err := m.allocator.AllocN(n)
if err != nil {
log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err))
return
@ -364,7 +366,8 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
log := log.Ctx(ctx).With(zap.String("view", view.String()))
// TODO[GOOSE], 11 = 1 planID + 10 segmentID, this is a hack need to be removed.
// Any plan that output segment number greater than 10 will be marked as invalid plan for now.
startID, endID, err := m.allocator.AllocN(11)
n := 11 * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt64()
startID, endID, err := m.allocator.AllocN(n)
if err != nil {
log.Warn("fFailed to submit compaction view to scheduler because allocate id fail", zap.Error(err))
return

View File

@ -0,0 +1,46 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// PreAllocateBinlogIDs pre-allocates binlog IDs based on the total number of binlogs from
// the segments for compaction, multiplied by an expansion factor.
func PreAllocateBinlogIDs(allocator allocator.Allocator, segmentInfos []*SegmentInfo) (*datapb.IDRange, error) {
binlogNum := 0
for _, s := range segmentInfos {
for _, l := range s.GetBinlogs() {
binlogNum += len(l.GetBinlogs())
}
for _, l := range s.GetDeltalogs() {
binlogNum += len(l.GetBinlogs())
}
for _, l := range s.GetStatslogs() {
binlogNum += len(l.GetBinlogs())
}
for _, l := range s.GetBm25Statslogs() {
binlogNum += len(l.GetBinlogs())
}
}
n := binlogNum * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt()
begin, end, err := allocator.AllocN(int64(n))
return &datapb.IDRange{Begin: begin, End: end}, err
}

View File

@ -59,6 +59,7 @@ import (
type CompactionMeta interface {
GetSegment(ctx context.Context, segID UniqueID) *SegmentInfo
GetSegmentInfos(segIDs []UniqueID) []*SegmentInfo
SelectSegments(ctx context.Context, filters ...SegmentFilter) []*SegmentInfo
GetHealthySegment(ctx context.Context, segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(ctx context.Context, operators ...UpdateOperator) error
@ -613,6 +614,19 @@ func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []
return result
}
func (m *meta) GetSegmentInfos(segIDs []UniqueID) []*SegmentInfo {
m.segMu.RLock()
defer m.segMu.RUnlock()
var result []*SegmentInfo
for _, id := range segIDs {
segment := m.segments.GetSegment(id)
if segment != nil {
result = append(result, segment)
}
}
return result
}
// GetSegment returns segment info with provided id
// include the unhealthy segment
// if not segment is found, nil will be returned

View File

@ -682,6 +682,54 @@ func (_c *MockCompactionMeta_GetSegment_Call) RunAndReturn(run func(context.Cont
return _c
}
// GetSegmentInfos provides a mock function with given fields: segIDs
func (_m *MockCompactionMeta) GetSegmentInfos(segIDs []int64) []*SegmentInfo {
ret := _m.Called(segIDs)
if len(ret) == 0 {
panic("no return value specified for GetSegmentInfos")
}
var r0 []*SegmentInfo
if rf, ok := ret.Get(0).(func([]int64) []*SegmentInfo); ok {
r0 = rf(segIDs)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo)
}
}
return r0
}
// MockCompactionMeta_GetSegmentInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentInfos'
type MockCompactionMeta_GetSegmentInfos_Call struct {
*mock.Call
}
// GetSegmentInfos is a helper method to define mock.On call
// - segIDs []int64
func (_e *MockCompactionMeta_Expecter) GetSegmentInfos(segIDs interface{}) *MockCompactionMeta_GetSegmentInfos_Call {
return &MockCompactionMeta_GetSegmentInfos_Call{Call: _e.mock.On("GetSegmentInfos", segIDs)}
}
func (_c *MockCompactionMeta_GetSegmentInfos_Call) Run(run func(segIDs []int64)) *MockCompactionMeta_GetSegmentInfos_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]int64))
})
return _c
}
func (_c *MockCompactionMeta_GetSegmentInfos_Call) Return(_a0 []*SegmentInfo) *MockCompactionMeta_GetSegmentInfos_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionMeta_GetSegmentInfos_Call) RunAndReturn(run func([]int64) []*SegmentInfo) *MockCompactionMeta_GetSegmentInfos_Call {
_c.Call.Return(run)
return _c
}
// SaveCompactionTask provides a mock function with given fields: ctx, task
func (_m *MockCompactionMeta) SaveCompactionTask(ctx context.Context, task *datapb.CompactionTask) error {
ret := _m.Called(ctx, task)

View File

@ -195,7 +195,7 @@ func (t *clusteringCompactionTask) init() error {
t.collectionID = t.GetCollection()
t.partitionID = t.plan.GetSegmentBinlogs()[0].GetPartitionID()
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64)
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
log.Info("segment ID range", zap.Int64("begin", t.plan.GetPreAllocatedSegmentIDs().GetBegin()), zap.Int64("end", t.plan.GetPreAllocatedSegmentIDs().GetEnd()))
t.logIDAlloc = logIDAlloc

View File

@ -93,6 +93,10 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
}},
TimeoutInSeconds: 10,
Type: datapb.CompactionType_ClusteringCompaction,
PreAllocatedLogIDs: &datapb.IDRange{
Begin: 200,
End: 2000,
},
}
s.task.plan = s.plan
}
@ -216,6 +220,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}
s.task.plan.PreAllocatedLogIDs = &datapb.IDRange{
Begin: 200,
End: 2000,
}
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
@ -299,6 +307,10 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}
s.task.plan.PreAllocatedLogIDs = &datapb.IDRange{
Begin: 1001,
End: 2000,
}
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
@ -372,6 +384,10 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}
s.task.plan.PreAllocatedLogIDs = &datapb.IDRange{
Begin: 1001,
End: 2000,
}
// 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936

View File

@ -68,7 +68,7 @@ func NewLevelZeroCompactionTask(
plan *datapb.CompactionPlan,
) *LevelZeroCompactionTask {
ctx, cancel := context.WithCancel(ctx)
alloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
alloc := allocator.NewLocalAllocator(plan.GetPreAllocatedLogIDs().GetBegin(), plan.GetPreAllocatedLogIDs().GetEnd())
return &LevelZeroCompactionTask{
ctx: ctx,
cancel: cancel,

View File

@ -58,7 +58,13 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
paramtable.Init()
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
// plan of the task is unset
s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, nil)
plan := &datapb.CompactionPlan{
PreAllocatedLogIDs: &datapb.IDRange{
Begin: 200,
End: 2000,
},
}
s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, plan)
pk2ts := map[int64]uint64{
1: 20000,
@ -270,7 +276,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
},
},
},
BeginLogID: 11111,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 11111, End: 21111},
}
s.task.plan = plan
@ -379,7 +385,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
},
},
},
BeginLogID: 11111,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 11111, End: 21111},
}
s.task.plan = plan
@ -441,7 +447,7 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
SegmentID: 100,
},
},
BeginLogID: 11111,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 11111, End: 21111},
}
s.Run("serializeUpload allocator Alloc failed", func() {

View File

@ -5,7 +5,6 @@ import (
"context"
"fmt"
sio "io"
"math"
"time"
"github.com/samber/lo"
@ -40,7 +39,7 @@ func mergeSortMultipleSegments(ctx context.Context,
log := log.With(zap.Int64("planID", plan.GetPlanID()))
segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegmentIDs().GetBegin(), plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
logIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedLogIDs().GetBegin(), plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds)

View File

@ -142,7 +142,7 @@ func (t *mixCompactionTask) mergeSplit(
log := log.With(zap.Int64("planID", t.GetPlanID()))
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64)
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID, t.bm25FieldIDs)

View File

@ -81,8 +81,8 @@ func (s *MixCompactionTaskSuite) SetupTest() {
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MixCompaction,
Schema: s.meta.GetSchema(),
BeginLogID: 19530,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 19531, End: math.MaxInt64},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 9530, End: 19530},
MaxSize: 64 * 1024 * 1024,
}
@ -105,8 +105,8 @@ func (s *MixCompactionTaskSuite) SetupBM25() {
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MixCompaction,
Schema: s.meta.GetSchema(),
BeginLogID: 19530,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 19531, End: math.MaxInt64},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 9530, End: 19530},
MaxSize: 64 * 1024 * 1024,
}

View File

@ -201,6 +201,10 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
return merr.Status(merr.WrapErrParameterInvalidMsg("invalid beginLogID")), nil
}
if req.GetPreAllocatedLogIDs().GetBegin() == 0 || req.GetPreAllocatedLogIDs().GetBegin() == 0 {
return merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("invalid beginID %d and invalid endID %d", req.GetPreAllocatedLogIDs().GetBegin(), req.GetPreAllocatedLogIDs().GetBegin()))), nil
}
/*
spanCtx := trace.SpanContextFromContext(ctx)

View File

@ -249,7 +249,8 @@ func (s *DataNodeServicesSuite) TestCompaction() {
{SegmentID: 102, Level: datapb.SegmentLevel_L0},
{SegmentID: 103, Level: datapb.SegmentLevel_L1},
},
BeginLogID: 100,
BeginLogID: 100,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 200, End: 2000},
}
resp, err := node.CompactionV2(ctx, req)
@ -273,6 +274,7 @@ func (s *DataNodeServicesSuite) TestCompaction() {
Type: datapb.CompactionType_ClusteringCompaction,
BeginLogID: 100,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 100, End: 200},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 200, End: 2000},
}
resp, err := node.CompactionV2(ctx, req)
@ -293,8 +295,9 @@ func (s *DataNodeServicesSuite) TestCompaction() {
{SegmentID: 102, Level: datapb.SegmentLevel_L0},
{SegmentID: 103, Level: datapb.SegmentLevel_L1},
},
Type: datapb.CompactionType_ClusteringCompaction,
BeginLogID: 0,
Type: datapb.CompactionType_ClusteringCompaction,
BeginLogID: 0,
PreAllocatedLogIDs: &datapb.IDRange{Begin: 200, End: 2000},
}
resp, err := node.CompactionV2(ctx, req)
@ -318,6 +321,7 @@ func (s *DataNodeServicesSuite) TestCompaction() {
Type: datapb.CompactionType_ClusteringCompaction,
BeginLogID: 100,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 0, End: 0},
PreAllocatedLogIDs: &datapb.IDRange{Begin: 200, End: 2000},
}
resp, err := node.CompactionV2(ctx, req)

View File

@ -626,10 +626,11 @@ message CompactionPlan {
string analyze_result_path = 14;
repeated int64 analyze_segment_ids = 15;
int32 state = 16;
int64 begin_logID = 17;
int64 begin_logID = 17; // deprecated, use pre_allocated_logIDs instead.
IDRange pre_allocated_segmentIDs = 18;
int64 slot_usage = 19;
int64 max_size = 20;
IDRange pre_allocated_logIDs = 21;
}
message CompactionSegment {

File diff suppressed because it is too large Load Diff

View File

@ -3488,11 +3488,12 @@ type dataCoordConfig struct {
BlockingL0SizeInMB ParamItem `refreshable:"true"`
// compaction
EnableCompaction ParamItem `refreshable:"false"`
EnableAutoCompaction ParamItem `refreshable:"true"`
IndexBasedCompaction ParamItem `refreshable:"true"`
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`
EnableCompaction ParamItem `refreshable:"false"`
EnableAutoCompaction ParamItem `refreshable:"true"`
IndexBasedCompaction ParamItem `refreshable:"true"`
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`
CompactionPreAllocateIDExpansionFactor ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
@ -3825,6 +3826,14 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl
}
p.CompactionTaskQueueCapacity.Init(base.mgr)
p.CompactionPreAllocateIDExpansionFactor = ParamItem{
Key: "dataCoord.compaction.preAllocateIDExpansionFactor",
Version: "2.5.8",
DefaultValue: "100",
Doc: `The expansion factor for pre-allocating IDs during compaction.`,
}
p.CompactionPreAllocateIDExpansionFactor.Init(base.mgr)
p.CompactionRPCTimeout = ParamItem{
Key: "dataCoord.compaction.rpcTimeout",
Version: "2.2.12",

View File

@ -517,6 +517,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
params.Save("dataCoord.compaction.dropTolerance", "100")
assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds())
assert.Equal(t, int64(100), Params.CompactionPreAllocateIDExpansionFactor.GetAsInt64())
params.Save("dataCoord.compaction.clustering.enable", "true")
assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool())