From 99b53316e512cb24a628efbc59bba3b5dcf1b426 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 23 Dec 2025 11:55:19 +0800 Subject: [PATCH] enhance: Set latestDeletePos from L0 segments to bound L1 selection (#46436) This commit refines L0 compaction to ensure data consistency by properly setting the delete position boundary for L1 segment selection. Key Changes: 1. L0 View Trigger Sets latestDeletePos for L1 Selection 2. Filter L0 Segments by Growing Segment Position in policy, not in views 3. Renamed LevelZeroSegmentsView to LevelZeroCompactionView 4. Renamed fields for semantic clarity: * segments -> l0Segments * earliestGrowingSegmentPos -> latestDeletePos 5. Update Default Compaction Prioritizer to level See also: #46434 --------- Signed-off-by: yangxuan --- configs/milvus.yaml | 2 +- internal/datacoord/compaction_inspector.go | 9 - .../datacoord/compaction_inspector_test.go | 7 +- internal/datacoord/compaction_l0_view.go | 140 +++---- internal/datacoord/compaction_l0_view_test.go | 55 +-- internal/datacoord/compaction_policy_l0.go | 50 ++- .../datacoord/compaction_policy_l0_test.go | 390 +++++++++++++++++- internal/datacoord/compaction_trigger_v2.go | 2 +- .../datacoord/compaction_trigger_v2_test.go | 4 +- pkg/util/paramtable/component_param.go | 2 +- 10 files changed, 502 insertions(+), 159 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 08a845c11d..8a6d9f4505 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -666,7 +666,7 @@ dataCoord: # default is FIFO. # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. - taskPrioritizer: default + taskPrioritizer: level taskQueueCapacity: 100000 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: -1 # Deprecated, see datanode.slot.slotCap diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index d15d2c3287..711b478248 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -40,8 +40,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -// TODO: we just warn about the long executing/queuing tasks -// need to get rid of long queuing tasks because the compaction tasks are local optimum. var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{ datapb.CompactionType_MixCompaction: 30 * time.Minute, datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute, @@ -63,11 +61,6 @@ type CompactionInspector interface { getCompactionTasksNum(filters ...compactionTaskFilter) int } -var ( - errChannelNotWatched = errors.New("channel is not watched") - errChannelInBuffer = errors.New("channel is in buffer") -) - var _ CompactionInspector = (*compactionInspector)(nil) type compactionInfo struct { @@ -187,8 +180,6 @@ func (c *compactionInspector) getCompactionTasksNumBySignalID(triggerID int64) i func newCompactionInspector(meta CompactionMeta, allocator allocator.Allocator, handler Handler, scheduler task.GlobalScheduler, ievm IndexEngineVersionManager, ) *compactionInspector { - // Higher capacity will have better ordering in priority, but consumes more memory. - // TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of. capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt() return &compactionInspector{ queueTasks: NewCompactionQueue(capacity, getPrioritizer()), diff --git a/internal/datacoord/compaction_inspector_test.go b/internal/datacoord/compaction_inspector_test.go index 5011cafedc..e13f9e3815 100644 --- a/internal/datacoord/compaction_inspector_test.go +++ b/internal/datacoord/compaction_inspector_test.go @@ -199,7 +199,7 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { {PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}, {PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, }, - []UniqueID{13, 14}, + []UniqueID{14, 13}, }, { "empty tasks", @@ -359,9 +359,10 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { s.handler.submitTask(t) } gotTasks := s.handler.schedule() - s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { + gotPlanIDs := lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { return t.GetTaskProto().GetPlanID() - })) + }) + s.ElementsMatch(test.expectedOut, gotPlanIDs) }) } } diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index 72695870e5..850438780c 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -2,6 +2,7 @@ package datacoord import ( "fmt" + "sort" "github.com/samber/lo" @@ -9,122 +10,112 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -// The LevelZeroSegments keeps the min group -type LevelZeroSegmentsView struct { - label *CompactionGroupLabel - segments []*SegmentView - earliestGrowingSegmentPos *msgpb.MsgPosition - triggerID int64 +// LevelZeroCompactionView holds all compactable L0 segments in a compaction group +// Trigger use static algorithm, it will selects l0Segments according to the min and max threshold +// +// to limit the memory and io usages per L0 Compaction. +// +// Given the same l0Segments, Trigger is idempotent, it'll give consist result. +type LevelZeroCompactionView struct { + triggerID int64 + label *CompactionGroupLabel + l0Segments []*SegmentView + latestDeletePos *msgpb.MsgPosition } -var _ CompactionView = (*LevelZeroSegmentsView)(nil) +var _ CompactionView = (*LevelZeroCompactionView)(nil) -func (v *LevelZeroSegmentsView) String() string { - l0strings := lo.Map(v.segments, func(v *SegmentView, _ int) string { +func (v *LevelZeroCompactionView) String() string { + l0strings := lo.Map(v.l0Segments, func(v *SegmentView, _ int) string { return v.LevelZeroString() }) - count := lo.SumBy(v.segments, func(v *SegmentView) int { + count := lo.SumBy(v.l0Segments, func(v *SegmentView) int { return v.DeltaRowCount }) return fmt.Sprintf("L0SegCount=%d, DeltaRowCount=%d, label=<%s>, posT=<%v>, L0 segments=%v", - len(v.segments), + len(v.l0Segments), count, v.label.String(), - v.earliestGrowingSegmentPos.GetTimestamp(), + v.latestDeletePos.GetTimestamp(), l0strings) } -func (v *LevelZeroSegmentsView) Append(segments ...*SegmentView) { - if v.segments == nil { - v.segments = segments +func (v *LevelZeroCompactionView) Append(segments ...*SegmentView) { + if v.l0Segments == nil { + v.l0Segments = segments return } - v.segments = append(v.segments, segments...) + v.l0Segments = append(v.l0Segments, segments...) } -func (v *LevelZeroSegmentsView) GetGroupLabel() *CompactionGroupLabel { +func (v *LevelZeroCompactionView) GetGroupLabel() *CompactionGroupLabel { if v == nil { return &CompactionGroupLabel{} } return v.label } -func (v *LevelZeroSegmentsView) GetSegmentsView() []*SegmentView { +func (v *LevelZeroCompactionView) GetSegmentsView() []*SegmentView { if v == nil { return nil } - return v.segments -} - -func (v *LevelZeroSegmentsView) Equal(others []*SegmentView) bool { - if len(v.segments) != len(others) { - return false - } - - IDSelector := func(v *SegmentView, _ int) int64 { - return v.ID - } - - diffLeft, diffRight := lo.Difference(lo.Map(others, IDSelector), lo.Map(v.segments, IDSelector)) - - diffCount := len(diffLeft) + len(diffRight) - return diffCount == 0 + return v.l0Segments } // ForceTrigger triggers all qualified LevelZeroSegments according to views -func (v *LevelZeroSegmentsView) ForceTrigger() (CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) ForceTrigger() (CompactionView, string) { + sort.Slice(v.l0Segments, func(i, j int) bool { + return v.l0Segments[i].dmlPos.GetTimestamp() < v.l0Segments[j].dmlPos.GetTimestamp() }) - targetViews, reason := v.forceTrigger(validSegments) + targetViews, reason := v.forceTrigger(v.l0Segments) + + // Use the max dmlPos timestamp as the latestDeletePos + latestL0 := lo.MaxBy(targetViews, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() + }) if len(targetViews) > 0 { - return &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + return &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, }, reason } return nil, "" } -func (v *LevelZeroSegmentsView) ForceTriggerAll() ([]CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) ForceTriggerAll() ([]CompactionView, string) { + sort.Slice(v.l0Segments, func(i, j int) bool { + return v.l0Segments[i].dmlPos.GetTimestamp() < v.l0Segments[j].dmlPos.GetTimestamp() }) - if len(validSegments) == 0 { - return nil, "" - } - var resultViews []CompactionView - var lastReason string - remainingSegments := validSegments + remainingSegments := v.l0Segments // Multi-round force trigger loop for len(remainingSegments) > 0 { - targetViews, reason := v.forceTrigger(remainingSegments) + targetViews, _ := v.forceTrigger(remainingSegments) if len(targetViews) == 0 { // No more segments can be force triggered, break the loop break } // Create a new LevelZeroSegmentsView for this round's target views - roundView := &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + latestL0 := lo.MaxBy(targetViews, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() + }) + roundView := &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, } resultViews = append(resultViews, roundView) - lastReason = reason // Remove the target segments from remaining segments for next round targetSegmentIDs := lo.Map(targetViews, func(view *SegmentView, _ int) int64 { @@ -135,27 +126,26 @@ func (v *LevelZeroSegmentsView) ForceTriggerAll() ([]CompactionView, string) { }) } - return resultViews, lastReason + return resultViews, "force trigger all" } -func (v *LevelZeroSegmentsView) GetTriggerID() int64 { +func (v *LevelZeroCompactionView) GetTriggerID() int64 { return v.triggerID } // Trigger triggers all qualified LevelZeroSegments according to views -func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) Trigger() (CompactionView, string) { + latestL0 := lo.MaxBy(v.l0Segments, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() }) - targetViews, reason := v.minCountSizeTrigger(validSegments) + targetViews, reason := v.minCountSizeTrigger(v.l0Segments) if len(targetViews) > 0 { - return &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + return &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, }, reason } @@ -165,7 +155,7 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { // minCountSizeTrigger tries to trigger LevelZeroCompaction when segmentViews reaches minimum trigger conditions: // 1. count >= minDeltaCount, OR // 2. size >= minDeltaSize -func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { +func (v *LevelZeroCompactionView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { var ( minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat() maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() @@ -195,7 +185,7 @@ func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (pi // forceTrigger tries to trigger LevelZeroCompaction even when segmentsViews don't meet the minimum condition, // the picked plan is still satisfied with the maximum condition -func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { +func (v *LevelZeroCompactionView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { var ( maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index 30e4e7549d..6a7c2affc4 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -19,7 +19,7 @@ func TestLevelZeroSegmentsViewSuite(t *testing.T) { type LevelZeroSegmentsViewSuite struct { suite.Suite - v *LevelZeroSegmentsView + v *LevelZeroCompactionView } func genTestL0SegmentView(ID UniqueID, label *CompactionGroupLabel, posTime Timestamp) *SegmentView { @@ -44,11 +44,11 @@ func (s *LevelZeroSegmentsViewSuite) SetupTest() { genTestL0SegmentView(102, label, 10000), } - targetView := &LevelZeroSegmentsView{ - label: label, - segments: segments, - earliestGrowingSegmentPos: &msgpb.MsgPosition{Timestamp: 10000}, - triggerID: 10000, + targetView := &LevelZeroCompactionView{ + label: label, + l0Segments: segments, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 10000}, + triggerID: 10000, } s.True(label.Equal(targetView.GetGroupLabel())) @@ -57,32 +57,6 @@ func (s *LevelZeroSegmentsViewSuite) SetupTest() { s.v = targetView } -func (s *LevelZeroSegmentsViewSuite) TestEqual() { - label := s.v.GetGroupLabel() - - tests := []struct { - description string - - input []*SegmentView - output bool - }{ - {"Different segment numbers", []*SegmentView{genTestL0SegmentView(100, label, 10000)}, false}, - {"Same number, diff segmentIDs", []*SegmentView{ - genTestL0SegmentView(100, label, 10000), - genTestL0SegmentView(101, label, 10000), - genTestL0SegmentView(200, label, 10000), - }, false}, - {"Same", s.v.GetSegmentsView(), true}, - } - - for _, test := range tests { - s.Run(test.description, func() { - got := s.v.Equal(test.input) - s.Equal(test.output, got) - }) - } -} - func (s *LevelZeroSegmentsViewSuite) TestTrigger() { label := s.v.GetGroupLabel() views := []*SegmentView{ @@ -92,7 +66,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { genTestL0SegmentView(103, label, 40000), } - s.v.segments = views + s.v.l0Segments = views tests := []struct { description string @@ -102,13 +76,6 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { expectedSegs []UniqueID }{ - { - "No valid segments by earliest growing segment pos", - 64, - 20, - 10000, - nil, - }, { "Not qualified", 1, @@ -121,14 +88,14 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { 8 * 1024 * 1024, 1, 30000, - []UniqueID{100, 101}, + []UniqueID{100, 101, 102, 103}, }, { "Trigger by > TriggerDeltaCount", 1, 10, 30000, - []UniqueID{100, 101}, + []UniqueID{100, 101, 102, 103}, }, { "Trigger by > maxDeltaSize", @@ -148,7 +115,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { for _, test := range tests { s.Run(test.description, func() { - s.v.earliestGrowingSegmentPos.Timestamp = test.prepEarliestT + s.v.latestDeletePos.Timestamp = test.prepEarliestT for _, view := range s.v.GetSegmentsView() { if view.dmlPos.Timestamp < test.prepEarliestT { view.DeltalogCount = test.prepCountEach @@ -162,7 +129,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { if len(test.expectedSegs) == 0 { s.Nil(gotView) } else { - levelZeroView, ok := gotView.(*LevelZeroSegmentsView) + levelZeroView, ok := gotView.(*LevelZeroCompactionView) s.True(ok) s.NotNil(levelZeroView) diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 85fd471e79..f170130a49 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +// Chooses qualified L0 segments to do L0 compaction type l0CompactionPolicy struct { meta *meta @@ -110,8 +111,8 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa } else { activeL0Views = append(activeL0Views, labelViews...) } - } + if len(activeL0Views) > 0 { events[TriggerTypeLevelZeroViewChange] = activeL0Views } @@ -122,27 +123,6 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView, triggerID UniqueID) []CompactionView { - partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key - for _, view := range levelZeroSegments { - key := view.label.Key() - if _, ok := partChanView[key]; !ok { - partChanView[key] = &LevelZeroSegmentsView{ - label: view.label, - segments: []*SegmentView{view}, - earliestGrowingSegmentPos: policy.meta.GetEarliestStartPositionOfGrowingSegments(view.label), - triggerID: triggerID, - } - } else { - partChanView[key].Append(view) - } - } - - return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView { - return view - }) -} - func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64) ([]CompactionView, int64, error) { log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) log.Info("start trigger collection l0 compaction") @@ -170,6 +150,32 @@ func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, coll return views, newTriggerID, nil } +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView, triggerID UniqueID) []CompactionView { + partChanView := make(map[string]*LevelZeroCompactionView) // "part-chan" as key + for _, segView := range levelZeroSegments { + key := segView.label.Key() + if _, ok := partChanView[key]; !ok { + earliestGrowingStartPos := policy.meta.GetEarliestStartPositionOfGrowingSegments(segView.label) + partChanView[key] = &LevelZeroCompactionView{ + label: segView.label, + l0Segments: []*SegmentView{}, + latestDeletePos: earliestGrowingStartPos, + triggerID: triggerID, + } + } + + l0View := partChanView[key] + // Only choose segments with position less than or equal to the earliest growing segment position + if segView.dmlPos.GetTimestamp() <= l0View.latestDeletePos.GetTimestamp() { + l0View.Append(segView) + } + } + + return lo.Map(lo.Values(partChanView), func(view *LevelZeroCompactionView, _ int) CompactionView { + return view + }) +} + type activeCollection struct { ID int64 lastRefresh time.Time diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 8031a00b0e..cdfa1e6767 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -112,7 +112,7 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() { cView := gotViews[0] s.Equal(s.testLabel, cView.GetGroupLabel()) - s.Equal(4, len(cView.GetSegmentsView())) + s.Equal(3, len(cView.GetSegmentsView())) for _, view := range cView.GetSegmentsView() { s.Equal(datapb.SegmentLevel_L0, view.Level) } @@ -189,6 +189,213 @@ func (s *L0CompactionPolicySuite) TestManualTrigger() { s.l0_policy.triggerOneCollection(context.Background(), s.testLabel.CollectionID) } +func (s *L0CompactionPolicySuite) TestPositionFiltering() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {103, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 40000, 0, 0, 4 * MB, 1}, + {104, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 0, 0, 4 * MB, 1}, + {200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 35000, 10 * MB, 1, 0, 0}, + {201, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 60000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + s.Equal(s.testLabel, cView.GetGroupLabel()) + segViews := cView.GetSegmentsView() + s.Equal(3, len(segViews)) + + for _, view := range segViews { + s.Equal(datapb.SegmentLevel_L0, view.Level) + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(35000)) + } + + includedIDs := []int64{100, 101, 102} + for _, id := range includedIDs { + found := false + for _, view := range segViews { + if view.ID == id { + found = true + break + } + } + s.True(found, "segment %d should be included", id) + } + + excludedIDs := []int64{103, 104} + for _, id := range excludedIDs { + for _, view := range segViews { + s.NotEqual(id, view.ID, "segment %d should be excluded due to position > earliest growing position", id) + } + } +} + +func (s *L0CompactionPolicySuite) TestPositionFilteringWithNoGrowingSegments() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {300, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 10000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + segViews := cView.GetSegmentsView() + s.Equal(3, len(segViews)) + + for _, view := range segViews { + s.Equal(datapb.SegmentLevel_L0, view.Level) + } +} + +func (s *L0CompactionPolicySuite) TestPositionFilteringEdgeCase() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30001, 0, 0, 4 * MB, 1}, + {200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + + includedIDs := []int64{100, 101} + for _, id := range includedIDs { + found := false + for _, view := range segViews { + if view.ID == id { + found = true + break + } + } + s.True(found, "segment %d with position <= 30000 should be included", id) + } + + for _, view := range segViews { + s.NotEqual(int64(102), view.ID, "segment 102 with position 30001 should be excluded") + } +} + func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { segArgs := []struct { ID UniqueID @@ -258,3 +465,184 @@ func genTestBinlogs(logCount int, logSize int64) []*datapb.FieldBinlog { {Binlogs: binlogs}, } } + +func (s *L0CompactionPolicySuite) TestMultiChannelPositionFiltering() { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-2", + } + + segArgs := []struct { + ID UniqueID + Label *CompactionGroupLabel + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 0, 0, 4 * MB, 1}, + {200, label1, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 10 * MB, 1, 0, 0}, + {300, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 15000, 0, 0, 4 * MB, 1}, + {301, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 25000, 0, 0, 4 * MB, 1}, + {302, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 60000, 0, 0, 4 * MB, 1}, + {400, label2, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 40000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(arg.Label, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(2, len(gotViews)) + + for _, cView := range gotViews { + if cView.GetGroupLabel().Channel == "ch-1" { + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + for _, view := range segViews { + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(30000)) + } + } else if cView.GetGroupLabel().Channel == "ch-2" { + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + for _, view := range segViews { + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(40000)) + } + } + } +} + +func (s *L0CompactionPolicySuite) TestGroupL0ViewsByPartChan() { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 20, + Channel: "ch-1", + } + + segments := []*SegmentView{ + { + ID: 100, + label: label1, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 10000}, + }, + { + ID: 101, + label: label1, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 20000}, + }, + { + ID: 200, + label: label2, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 15000}, + }, + } + + meta := &meta{segments: NewSegmentsInfo()} + for _, segView := range segments { + info := genTestSegmentInfo(segView.label, segView.ID, segView.Level, commonpb.SegmentState_Flushed) + info.DmlPosition = segView.dmlPos + info.Deltalogs = genTestBinlogs(1, 4*MB) + meta.segments.SetSegment(segView.ID, info) + } + + s.l0_policy.meta = meta + views := s.l0_policy.groupL0ViewsByPartChan(1, segments, 999) + + s.Equal(2, len(views)) + + for _, view := range views { + if view.GetGroupLabel().PartitionID == 10 { + s.Equal(2, len(view.GetSegmentsView())) + } else if view.GetGroupLabel().PartitionID == 20 { + s.Equal(1, len(view.GetSegmentsView())) + } + } +} + +func (s *L0CompactionPolicySuite) TestLevelZeroCompactionViewString() { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + view := &LevelZeroCompactionView{ + triggerID: 123, + label: label, + l0Segments: []*SegmentView{ + {ID: 100, Level: datapb.SegmentLevel_L0, DeltaSize: 4 * MB, DeltalogCount: 1}, + {ID: 101, Level: datapb.SegmentLevel_L0, DeltaSize: 8 * MB, DeltalogCount: 2}, + }, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 30000}, + } + + s.Contains(view.String(), "L0SegCount=2") + s.Contains(view.String(), "posT=<30000>") + s.Contains(view.String(), "label=") +} + +func (s *L0CompactionPolicySuite) TestLevelZeroCompactionViewAppend() { + view := &LevelZeroCompactionView{ + triggerID: 123, + label: s.testLabel, + l0Segments: nil, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 30000}, + } + + s.Nil(view.l0Segments) + + seg1 := &SegmentView{ID: 100, Level: datapb.SegmentLevel_L0} + view.Append(seg1) + s.Equal(1, len(view.l0Segments)) + s.Equal(int64(100), view.l0Segments[0].ID) + + seg2 := &SegmentView{ID: 101, Level: datapb.SegmentLevel_L0} + seg3 := &SegmentView{ID: 102, Level: datapb.SegmentLevel_L0} + view.Append(seg2, seg3) + s.Equal(3, len(view.l0Segments)) +} diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index d3c4b5feb6..b601d68cf5 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -458,7 +458,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, Channel: view.GetGroupLabel().Channel, CollectionID: view.GetGroupLabel().CollectionID, PartitionID: view.GetGroupLabel().PartitionID, - Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, + Pos: view.(*LevelZeroCompactionView).latestDeletePos, Schema: collection.Schema, } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 3bd16ac175..8281e3ffda 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -89,7 +89,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { s.Require().Equal(1, len(latestL0Segments)) levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + cView, ok := levelZeroViews[0].(*LevelZeroCompactionView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -132,7 +132,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.Require().NotEmpty(latestL0Segments) levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + cView, ok := levelZeroViews[0].(*LevelZeroCompactionView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f71b9e4e4c..c28d3ed23b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4956,7 +4956,7 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t p.CompactionTaskPrioritizer = ParamItem{ Key: "dataCoord.compaction.taskPrioritizer", Version: "2.5.0", - DefaultValue: "default", + DefaultValue: "level", Doc: `compaction task prioritizer, options: [default, level, mix]. default is FIFO. level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.