diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 08a845c11d..8a6d9f4505 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -666,7 +666,7 @@ dataCoord: # default is FIFO. # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. - taskPrioritizer: default + taskPrioritizer: level taskQueueCapacity: 100000 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: -1 # Deprecated, see datanode.slot.slotCap diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index d15d2c3287..711b478248 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -40,8 +40,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -// TODO: we just warn about the long executing/queuing tasks -// need to get rid of long queuing tasks because the compaction tasks are local optimum. var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{ datapb.CompactionType_MixCompaction: 30 * time.Minute, datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute, @@ -63,11 +61,6 @@ type CompactionInspector interface { getCompactionTasksNum(filters ...compactionTaskFilter) int } -var ( - errChannelNotWatched = errors.New("channel is not watched") - errChannelInBuffer = errors.New("channel is in buffer") -) - var _ CompactionInspector = (*compactionInspector)(nil) type compactionInfo struct { @@ -187,8 +180,6 @@ func (c *compactionInspector) getCompactionTasksNumBySignalID(triggerID int64) i func newCompactionInspector(meta CompactionMeta, allocator allocator.Allocator, handler Handler, scheduler task.GlobalScheduler, ievm IndexEngineVersionManager, ) *compactionInspector { - // Higher capacity will have better ordering in priority, but consumes more memory. - // TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of. capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt() return &compactionInspector{ queueTasks: NewCompactionQueue(capacity, getPrioritizer()), diff --git a/internal/datacoord/compaction_inspector_test.go b/internal/datacoord/compaction_inspector_test.go index 5011cafedc..e13f9e3815 100644 --- a/internal/datacoord/compaction_inspector_test.go +++ b/internal/datacoord/compaction_inspector_test.go @@ -199,7 +199,7 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { {PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}, {PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}, }, - []UniqueID{13, 14}, + []UniqueID{14, 13}, }, { "empty tasks", @@ -359,9 +359,10 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { s.handler.submitTask(t) } gotTasks := s.handler.schedule() - s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { + gotPlanIDs := lo.Map(gotTasks, func(t CompactionTask, _ int) int64 { return t.GetTaskProto().GetPlanID() - })) + }) + s.ElementsMatch(test.expectedOut, gotPlanIDs) }) } } diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index 72695870e5..850438780c 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -2,6 +2,7 @@ package datacoord import ( "fmt" + "sort" "github.com/samber/lo" @@ -9,122 +10,112 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -// The LevelZeroSegments keeps the min group -type LevelZeroSegmentsView struct { - label *CompactionGroupLabel - segments []*SegmentView - earliestGrowingSegmentPos *msgpb.MsgPosition - triggerID int64 +// LevelZeroCompactionView holds all compactable L0 segments in a compaction group +// Trigger use static algorithm, it will selects l0Segments according to the min and max threshold +// +// to limit the memory and io usages per L0 Compaction. +// +// Given the same l0Segments, Trigger is idempotent, it'll give consist result. +type LevelZeroCompactionView struct { + triggerID int64 + label *CompactionGroupLabel + l0Segments []*SegmentView + latestDeletePos *msgpb.MsgPosition } -var _ CompactionView = (*LevelZeroSegmentsView)(nil) +var _ CompactionView = (*LevelZeroCompactionView)(nil) -func (v *LevelZeroSegmentsView) String() string { - l0strings := lo.Map(v.segments, func(v *SegmentView, _ int) string { +func (v *LevelZeroCompactionView) String() string { + l0strings := lo.Map(v.l0Segments, func(v *SegmentView, _ int) string { return v.LevelZeroString() }) - count := lo.SumBy(v.segments, func(v *SegmentView) int { + count := lo.SumBy(v.l0Segments, func(v *SegmentView) int { return v.DeltaRowCount }) return fmt.Sprintf("L0SegCount=%d, DeltaRowCount=%d, label=<%s>, posT=<%v>, L0 segments=%v", - len(v.segments), + len(v.l0Segments), count, v.label.String(), - v.earliestGrowingSegmentPos.GetTimestamp(), + v.latestDeletePos.GetTimestamp(), l0strings) } -func (v *LevelZeroSegmentsView) Append(segments ...*SegmentView) { - if v.segments == nil { - v.segments = segments +func (v *LevelZeroCompactionView) Append(segments ...*SegmentView) { + if v.l0Segments == nil { + v.l0Segments = segments return } - v.segments = append(v.segments, segments...) + v.l0Segments = append(v.l0Segments, segments...) } -func (v *LevelZeroSegmentsView) GetGroupLabel() *CompactionGroupLabel { +func (v *LevelZeroCompactionView) GetGroupLabel() *CompactionGroupLabel { if v == nil { return &CompactionGroupLabel{} } return v.label } -func (v *LevelZeroSegmentsView) GetSegmentsView() []*SegmentView { +func (v *LevelZeroCompactionView) GetSegmentsView() []*SegmentView { if v == nil { return nil } - return v.segments -} - -func (v *LevelZeroSegmentsView) Equal(others []*SegmentView) bool { - if len(v.segments) != len(others) { - return false - } - - IDSelector := func(v *SegmentView, _ int) int64 { - return v.ID - } - - diffLeft, diffRight := lo.Difference(lo.Map(others, IDSelector), lo.Map(v.segments, IDSelector)) - - diffCount := len(diffLeft) + len(diffRight) - return diffCount == 0 + return v.l0Segments } // ForceTrigger triggers all qualified LevelZeroSegments according to views -func (v *LevelZeroSegmentsView) ForceTrigger() (CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) ForceTrigger() (CompactionView, string) { + sort.Slice(v.l0Segments, func(i, j int) bool { + return v.l0Segments[i].dmlPos.GetTimestamp() < v.l0Segments[j].dmlPos.GetTimestamp() }) - targetViews, reason := v.forceTrigger(validSegments) + targetViews, reason := v.forceTrigger(v.l0Segments) + + // Use the max dmlPos timestamp as the latestDeletePos + latestL0 := lo.MaxBy(targetViews, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() + }) if len(targetViews) > 0 { - return &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + return &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, }, reason } return nil, "" } -func (v *LevelZeroSegmentsView) ForceTriggerAll() ([]CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) ForceTriggerAll() ([]CompactionView, string) { + sort.Slice(v.l0Segments, func(i, j int) bool { + return v.l0Segments[i].dmlPos.GetTimestamp() < v.l0Segments[j].dmlPos.GetTimestamp() }) - if len(validSegments) == 0 { - return nil, "" - } - var resultViews []CompactionView - var lastReason string - remainingSegments := validSegments + remainingSegments := v.l0Segments // Multi-round force trigger loop for len(remainingSegments) > 0 { - targetViews, reason := v.forceTrigger(remainingSegments) + targetViews, _ := v.forceTrigger(remainingSegments) if len(targetViews) == 0 { // No more segments can be force triggered, break the loop break } // Create a new LevelZeroSegmentsView for this round's target views - roundView := &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + latestL0 := lo.MaxBy(targetViews, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() + }) + roundView := &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, } resultViews = append(resultViews, roundView) - lastReason = reason // Remove the target segments from remaining segments for next round targetSegmentIDs := lo.Map(targetViews, func(view *SegmentView, _ int) int64 { @@ -135,27 +126,26 @@ func (v *LevelZeroSegmentsView) ForceTriggerAll() ([]CompactionView, string) { }) } - return resultViews, lastReason + return resultViews, "force trigger all" } -func (v *LevelZeroSegmentsView) GetTriggerID() int64 { +func (v *LevelZeroCompactionView) GetTriggerID() int64 { return v.triggerID } // Trigger triggers all qualified LevelZeroSegments according to views -func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { - // Only choose segments with position less than the earliest growing segment position - validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { - return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() +func (v *LevelZeroCompactionView) Trigger() (CompactionView, string) { + latestL0 := lo.MaxBy(v.l0Segments, func(view1, view2 *SegmentView) bool { + return view1.dmlPos.GetTimestamp() > view2.dmlPos.GetTimestamp() }) - targetViews, reason := v.minCountSizeTrigger(validSegments) + targetViews, reason := v.minCountSizeTrigger(v.l0Segments) if len(targetViews) > 0 { - return &LevelZeroSegmentsView{ - label: v.label, - segments: targetViews, - earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, - triggerID: v.triggerID, + return &LevelZeroCompactionView{ + label: v.label, + l0Segments: targetViews, + latestDeletePos: latestL0.dmlPos, + triggerID: v.triggerID, }, reason } @@ -165,7 +155,7 @@ func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { // minCountSizeTrigger tries to trigger LevelZeroCompaction when segmentViews reaches minimum trigger conditions: // 1. count >= minDeltaCount, OR // 2. size >= minDeltaSize -func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { +func (v *LevelZeroCompactionView) minCountSizeTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { var ( minDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMinSize.GetAsFloat() maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() @@ -195,7 +185,7 @@ func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (pi // forceTrigger tries to trigger LevelZeroCompaction even when segmentsViews don't meet the minimum condition, // the picked plan is still satisfied with the maximum condition -func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { +func (v *LevelZeroCompactionView) forceTrigger(segments []*SegmentView) (picked []*SegmentView, reason string) { var ( maxDeltaSize = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerMaxSize.GetAsFloat() maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index 30e4e7549d..6a7c2affc4 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -19,7 +19,7 @@ func TestLevelZeroSegmentsViewSuite(t *testing.T) { type LevelZeroSegmentsViewSuite struct { suite.Suite - v *LevelZeroSegmentsView + v *LevelZeroCompactionView } func genTestL0SegmentView(ID UniqueID, label *CompactionGroupLabel, posTime Timestamp) *SegmentView { @@ -44,11 +44,11 @@ func (s *LevelZeroSegmentsViewSuite) SetupTest() { genTestL0SegmentView(102, label, 10000), } - targetView := &LevelZeroSegmentsView{ - label: label, - segments: segments, - earliestGrowingSegmentPos: &msgpb.MsgPosition{Timestamp: 10000}, - triggerID: 10000, + targetView := &LevelZeroCompactionView{ + label: label, + l0Segments: segments, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 10000}, + triggerID: 10000, } s.True(label.Equal(targetView.GetGroupLabel())) @@ -57,32 +57,6 @@ func (s *LevelZeroSegmentsViewSuite) SetupTest() { s.v = targetView } -func (s *LevelZeroSegmentsViewSuite) TestEqual() { - label := s.v.GetGroupLabel() - - tests := []struct { - description string - - input []*SegmentView - output bool - }{ - {"Different segment numbers", []*SegmentView{genTestL0SegmentView(100, label, 10000)}, false}, - {"Same number, diff segmentIDs", []*SegmentView{ - genTestL0SegmentView(100, label, 10000), - genTestL0SegmentView(101, label, 10000), - genTestL0SegmentView(200, label, 10000), - }, false}, - {"Same", s.v.GetSegmentsView(), true}, - } - - for _, test := range tests { - s.Run(test.description, func() { - got := s.v.Equal(test.input) - s.Equal(test.output, got) - }) - } -} - func (s *LevelZeroSegmentsViewSuite) TestTrigger() { label := s.v.GetGroupLabel() views := []*SegmentView{ @@ -92,7 +66,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { genTestL0SegmentView(103, label, 40000), } - s.v.segments = views + s.v.l0Segments = views tests := []struct { description string @@ -102,13 +76,6 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { expectedSegs []UniqueID }{ - { - "No valid segments by earliest growing segment pos", - 64, - 20, - 10000, - nil, - }, { "Not qualified", 1, @@ -121,14 +88,14 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { 8 * 1024 * 1024, 1, 30000, - []UniqueID{100, 101}, + []UniqueID{100, 101, 102, 103}, }, { "Trigger by > TriggerDeltaCount", 1, 10, 30000, - []UniqueID{100, 101}, + []UniqueID{100, 101, 102, 103}, }, { "Trigger by > maxDeltaSize", @@ -148,7 +115,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { for _, test := range tests { s.Run(test.description, func() { - s.v.earliestGrowingSegmentPos.Timestamp = test.prepEarliestT + s.v.latestDeletePos.Timestamp = test.prepEarliestT for _, view := range s.v.GetSegmentsView() { if view.dmlPos.Timestamp < test.prepEarliestT { view.DeltalogCount = test.prepCountEach @@ -162,7 +129,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { if len(test.expectedSegs) == 0 { s.Nil(gotView) } else { - levelZeroView, ok := gotView.(*LevelZeroSegmentsView) + levelZeroView, ok := gotView.(*LevelZeroCompactionView) s.True(ok) s.NotNil(levelZeroView) diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 85fd471e79..f170130a49 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +// Chooses qualified L0 segments to do L0 compaction type l0CompactionPolicy struct { meta *meta @@ -110,8 +111,8 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa } else { activeL0Views = append(activeL0Views, labelViews...) } - } + if len(activeL0Views) > 0 { events[TriggerTypeLevelZeroViewChange] = activeL0Views } @@ -122,27 +123,6 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView, triggerID UniqueID) []CompactionView { - partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key - for _, view := range levelZeroSegments { - key := view.label.Key() - if _, ok := partChanView[key]; !ok { - partChanView[key] = &LevelZeroSegmentsView{ - label: view.label, - segments: []*SegmentView{view}, - earliestGrowingSegmentPos: policy.meta.GetEarliestStartPositionOfGrowingSegments(view.label), - triggerID: triggerID, - } - } else { - partChanView[key].Append(view) - } - } - - return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView { - return view - }) -} - func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64) ([]CompactionView, int64, error) { log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) log.Info("start trigger collection l0 compaction") @@ -170,6 +150,32 @@ func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, coll return views, newTriggerID, nil } +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView, triggerID UniqueID) []CompactionView { + partChanView := make(map[string]*LevelZeroCompactionView) // "part-chan" as key + for _, segView := range levelZeroSegments { + key := segView.label.Key() + if _, ok := partChanView[key]; !ok { + earliestGrowingStartPos := policy.meta.GetEarliestStartPositionOfGrowingSegments(segView.label) + partChanView[key] = &LevelZeroCompactionView{ + label: segView.label, + l0Segments: []*SegmentView{}, + latestDeletePos: earliestGrowingStartPos, + triggerID: triggerID, + } + } + + l0View := partChanView[key] + // Only choose segments with position less than or equal to the earliest growing segment position + if segView.dmlPos.GetTimestamp() <= l0View.latestDeletePos.GetTimestamp() { + l0View.Append(segView) + } + } + + return lo.Map(lo.Values(partChanView), func(view *LevelZeroCompactionView, _ int) CompactionView { + return view + }) +} + type activeCollection struct { ID int64 lastRefresh time.Time diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 8031a00b0e..cdfa1e6767 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -112,7 +112,7 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() { cView := gotViews[0] s.Equal(s.testLabel, cView.GetGroupLabel()) - s.Equal(4, len(cView.GetSegmentsView())) + s.Equal(3, len(cView.GetSegmentsView())) for _, view := range cView.GetSegmentsView() { s.Equal(datapb.SegmentLevel_L0, view.Level) } @@ -189,6 +189,213 @@ func (s *L0CompactionPolicySuite) TestManualTrigger() { s.l0_policy.triggerOneCollection(context.Background(), s.testLabel.CollectionID) } +func (s *L0CompactionPolicySuite) TestPositionFiltering() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {103, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 40000, 0, 0, 4 * MB, 1}, + {104, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 0, 0, 4 * MB, 1}, + {200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 35000, 10 * MB, 1, 0, 0}, + {201, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 60000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + s.Equal(s.testLabel, cView.GetGroupLabel()) + segViews := cView.GetSegmentsView() + s.Equal(3, len(segViews)) + + for _, view := range segViews { + s.Equal(datapb.SegmentLevel_L0, view.Level) + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(35000)) + } + + includedIDs := []int64{100, 101, 102} + for _, id := range includedIDs { + found := false + for _, view := range segViews { + if view.ID == id { + found = true + break + } + } + s.True(found, "segment %d should be included", id) + } + + excludedIDs := []int64{103, 104} + for _, id := range excludedIDs { + for _, view := range segViews { + s.NotEqual(id, view.ID, "segment %d should be excluded due to position > earliest growing position", id) + } + } +} + +func (s *L0CompactionPolicySuite) TestPositionFilteringWithNoGrowingSegments() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {300, datapb.SegmentLevel_L1, commonpb.SegmentState_Flushed, 10000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + segViews := cView.GetSegmentsView() + s.Equal(3, len(segViews)) + + for _, view := range segViews { + s.Equal(datapb.SegmentLevel_L0, view.Level) + } +} + +func (s *L0CompactionPolicySuite) TestPositionFilteringEdgeCase() { + segArgs := []struct { + ID UniqueID + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30000, 0, 0, 4 * MB, 1}, + {102, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 30001, 0, 0, 4 * MB, 1}, + {200, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(s.testLabel, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(1, len(gotViews)) + + cView := gotViews[0] + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + + includedIDs := []int64{100, 101} + for _, id := range includedIDs { + found := false + for _, view := range segViews { + if view.ID == id { + found = true + break + } + } + s.True(found, "segment %d with position <= 30000 should be included", id) + } + + for _, view := range segViews { + s.NotEqual(int64(102), view.ID, "segment 102 with position 30001 should be excluded") + } +} + func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { segArgs := []struct { ID UniqueID @@ -258,3 +465,184 @@ func genTestBinlogs(logCount int, logSize int64) []*datapb.FieldBinlog { {Binlogs: binlogs}, } } + +func (s *L0CompactionPolicySuite) TestMultiChannelPositionFiltering() { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-2", + } + + segArgs := []struct { + ID UniqueID + Label *CompactionGroupLabel + Level datapb.SegmentLevel + State commonpb.SegmentState + PosT Timestamp + + InsertLogSize int64 + InsertLogCount int + + DelatLogSize int64 + DeltaLogCount int + }{ + {100, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 10000, 0, 0, 4 * MB, 1}, + {101, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 20000, 0, 0, 4 * MB, 1}, + {102, label1, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 50000, 0, 0, 4 * MB, 1}, + {200, label1, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 30000, 10 * MB, 1, 0, 0}, + {300, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 15000, 0, 0, 4 * MB, 1}, + {301, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 25000, 0, 0, 4 * MB, 1}, + {302, label2, datapb.SegmentLevel_L0, commonpb.SegmentState_Flushed, 60000, 0, 0, 4 * MB, 1}, + {400, label2, datapb.SegmentLevel_L1, commonpb.SegmentState_Growing, 40000, 10 * MB, 1, 0, 0}, + } + + segments := make(map[int64]*SegmentInfo) + for _, arg := range segArgs { + info := genTestSegmentInfo(arg.Label, arg.ID, arg.Level, arg.State) + if info.Level == datapb.SegmentLevel_L0 || info.State == commonpb.SegmentState_Flushed { + info.Deltalogs = genTestBinlogs(arg.DeltaLogCount, arg.DelatLogSize) + info.DmlPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + info.Binlogs = genTestBinlogs(arg.InsertLogCount, arg.InsertLogSize) + if info.State == commonpb.SegmentState_Growing { + info.StartPosition = &msgpb.MsgPosition{Timestamp: arg.PosT} + } + segments[arg.ID] = info + } + + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + s.l0_policy.meta = meta + s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(1, nil) + + events, err := s.l0_policy.Trigger(context.Background()) + s.NoError(err) + s.NotEmpty(events) + + gotViews, ok := events[TriggerTypeLevelZeroViewIDLE] + s.True(ok) + s.Equal(2, len(gotViews)) + + for _, cView := range gotViews { + if cView.GetGroupLabel().Channel == "ch-1" { + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + for _, view := range segViews { + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(30000)) + } + } else if cView.GetGroupLabel().Channel == "ch-2" { + segViews := cView.GetSegmentsView() + s.Equal(2, len(segViews)) + for _, view := range segViews { + s.LessOrEqual(view.dmlPos.GetTimestamp(), uint64(40000)) + } + } + } +} + +func (s *L0CompactionPolicySuite) TestGroupL0ViewsByPartChan() { + label1 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + label2 := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 20, + Channel: "ch-1", + } + + segments := []*SegmentView{ + { + ID: 100, + label: label1, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 10000}, + }, + { + ID: 101, + label: label1, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 20000}, + }, + { + ID: 200, + label: label2, + Level: datapb.SegmentLevel_L0, + dmlPos: &msgpb.MsgPosition{Timestamp: 15000}, + }, + } + + meta := &meta{segments: NewSegmentsInfo()} + for _, segView := range segments { + info := genTestSegmentInfo(segView.label, segView.ID, segView.Level, commonpb.SegmentState_Flushed) + info.DmlPosition = segView.dmlPos + info.Deltalogs = genTestBinlogs(1, 4*MB) + meta.segments.SetSegment(segView.ID, info) + } + + s.l0_policy.meta = meta + views := s.l0_policy.groupL0ViewsByPartChan(1, segments, 999) + + s.Equal(2, len(views)) + + for _, view := range views { + if view.GetGroupLabel().PartitionID == 10 { + s.Equal(2, len(view.GetSegmentsView())) + } else if view.GetGroupLabel().PartitionID == 20 { + s.Equal(1, len(view.GetSegmentsView())) + } + } +} + +func (s *L0CompactionPolicySuite) TestLevelZeroCompactionViewString() { + label := &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + view := &LevelZeroCompactionView{ + triggerID: 123, + label: label, + l0Segments: []*SegmentView{ + {ID: 100, Level: datapb.SegmentLevel_L0, DeltaSize: 4 * MB, DeltalogCount: 1}, + {ID: 101, Level: datapb.SegmentLevel_L0, DeltaSize: 8 * MB, DeltalogCount: 2}, + }, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 30000}, + } + + s.Contains(view.String(), "L0SegCount=2") + s.Contains(view.String(), "posT=<30000>") + s.Contains(view.String(), "label=") +} + +func (s *L0CompactionPolicySuite) TestLevelZeroCompactionViewAppend() { + view := &LevelZeroCompactionView{ + triggerID: 123, + label: s.testLabel, + l0Segments: nil, + latestDeletePos: &msgpb.MsgPosition{Timestamp: 30000}, + } + + s.Nil(view.l0Segments) + + seg1 := &SegmentView{ID: 100, Level: datapb.SegmentLevel_L0} + view.Append(seg1) + s.Equal(1, len(view.l0Segments)) + s.Equal(int64(100), view.l0Segments[0].ID) + + seg2 := &SegmentView{ID: 101, Level: datapb.SegmentLevel_L0} + seg3 := &SegmentView{ID: 102, Level: datapb.SegmentLevel_L0} + view.Append(seg2, seg3) + s.Equal(3, len(view.l0Segments)) +} diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index d3c4b5feb6..b601d68cf5 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -458,7 +458,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, Channel: view.GetGroupLabel().Channel, CollectionID: view.GetGroupLabel().CollectionID, PartitionID: view.GetGroupLabel().PartitionID, - Pos: view.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, + Pos: view.(*LevelZeroCompactionView).latestDeletePos, Schema: collection.Schema, } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 3bd16ac175..8281e3ffda 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -89,7 +89,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { s.Require().Equal(1, len(latestL0Segments)) levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + cView, ok := levelZeroViews[0].(*LevelZeroCompactionView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -132,7 +132,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.Require().NotEmpty(latestL0Segments) levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments, 10000) s.Require().Equal(1, len(levelZeroViews)) - cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) + cView, ok := levelZeroViews[0].(*LevelZeroCompactionView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f71b9e4e4c..c28d3ed23b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4956,7 +4956,7 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t p.CompactionTaskPrioritizer = ParamItem{ Key: "dataCoord.compaction.taskPrioritizer", Version: "2.5.0", - DefaultValue: "default", + DefaultValue: "level", Doc: `compaction task prioritizer, options: [default, level, mix]. default is FIFO. level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.