diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 83f25869a7..69c1384e3e 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -303,6 +303,17 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { plan := task.plan log := log.With(zap.Int64("taskID", task.triggerInfo.id), zap.Int64("planID", plan.GetPlanID())) if plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + // Fill in deltalogs for L0 segments + lo.ForEach(plan.SegmentBinlogs, func(seg *datapb.CompactionSegmentBinlogs, _ int) { + if seg.GetLevel() == datapb.SegmentLevel_L0 { + segInfo := c.meta.GetHealthySegment(seg.GetSegmentID()) + seg.Deltalogs = segInfo.GetDeltalogs() + } + }) + + // Select sealed L1 segments for LevelZero compaction that meets the condition: + // dmlPos < triggerInfo.pos + // TODO: select L2 segments too sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool { return info.GetCollectionID() == task.triggerInfo.collectionID && (task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) && @@ -324,7 +335,9 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) { }) plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) - log.Info("Compaction handler refreshed level zero compaction plan", zap.Any("target segments count", len(sealedSegBinlogs))) + log.Info("Compaction handler refreshed level zero compaction plan", + zap.Any("target position", task.triggerInfo.pos), + zap.Any("target segments count", len(sealedSegBinlogs))) return } diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index 71326586d1..d59df36c4b 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -67,6 +67,25 @@ func (v *LevelZeroSegmentsView) Equal(others []*SegmentView) bool { return diffCount == 0 } +// ForceTrigger triggers all qualified LevelZeroSegments according to views +func (v *LevelZeroSegmentsView) ForceTrigger() (CompactionView, string) { + // Only choose segments with position less than the earliest growing segment position + validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool { + return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp() + }) + + targetViews, reason := v.forceTrigger(validSegments) + if len(targetViews) > 0 { + return &LevelZeroSegmentsView{ + label: v.label, + segments: targetViews, + earliestGrowingSegmentPos: v.earliestGrowingSegmentPos, + }, reason + } + + return nil, "" +} + // Trigger triggers all qualified LevelZeroSegments according to views func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) { // Only choose segments with position less than the earliest growing segment position diff --git a/internal/datacoord/compaction_scheduler.go b/internal/datacoord/compaction_scheduler.go index 0cd2c93fa3..e2ab976ba9 100644 --- a/internal/datacoord/compaction_scheduler.go +++ b/internal/datacoord/compaction_scheduler.go @@ -28,10 +28,11 @@ type Scheduler interface { } type CompactionScheduler struct { - taskNumber *atomic.Int32 + taskNumber *atomic.Int32 + queuingTasks []*compactionTask parallelTasks map[int64][]*compactionTask // parallel by nodeID - mu sync.RWMutex + taskGuard sync.RWMutex planHandler *compactionPlanHandler } @@ -47,9 +48,9 @@ func NewCompactionScheduler() *CompactionScheduler { } func (s *CompactionScheduler) Submit(tasks ...*compactionTask) { - s.mu.Lock() + s.taskGuard.Lock() s.queuingTasks = append(s.queuingTasks, tasks...) - s.mu.Unlock() + s.taskGuard.Unlock() s.taskNumber.Add(int32(len(tasks))) lo.ForEach(tasks, func(t *compactionTask, _ int) { @@ -63,8 +64,8 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) { func (s *CompactionScheduler) Schedule() []*compactionTask { nodeTasks := make(map[int64][]*compactionTask) // nodeID - s.mu.Lock() - defer s.mu.Unlock() + s.taskGuard.Lock() + defer s.taskGuard.Unlock() for _, task := range s.queuingTasks { if _, ok := nodeTasks[task.dataNodeID]; !ok { nodeTasks[task.dataNodeID] = make([]*compactionTask, 0) @@ -156,7 +157,7 @@ func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPla planID := plan.GetPlanID() log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) - s.mu.Lock() + s.taskGuard.Lock() if parallel, ok := s.parallelTasks[nodeID]; ok { tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool { return t.plan.PlanID != planID @@ -183,25 +184,26 @@ func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPla log.Info("Compaction scheduler remove task from queue") } - s.mu.Unlock() + s.taskGuard.Unlock() s.LogStatus() } func (s *CompactionScheduler) LogStatus() { - s.mu.RLock() - defer s.mu.RUnlock() - waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 { - return t.plan.PlanID - }) + s.taskGuard.RLock() + defer s.taskGuard.RUnlock() - var executing []int64 - for _, tasks := range s.parallelTasks { - executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 { + if s.GetTaskCount() > 0 { + waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 { return t.plan.PlanID - })...) - } + }) + + var executing []int64 + for _, tasks := range s.parallelTasks { + executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 { + return t.plan.PlanID + })...) + } - if len(waiting) > 0 || len(executing) > 0 { log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing)) } } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 88db0f2173..cbb3dd6dc8 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -201,6 +201,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() { func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { channel := "Ch-1" + deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return( []*SegmentInfo{ {SegmentInfo: &datapb.SegmentInfo{ @@ -220,21 +221,27 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { }}, }, ) + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Deltalogs: deltalogs, + }} + }) - deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} // 2 l0 segments plan := &datapb.CompactionPlan{ PlanID: 1, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 100, - Deltalogs: deltalogs, Level: datapb.SegmentLevel_L0, InsertChannel: channel, }, { SegmentID: 101, - Deltalogs: deltalogs, Level: datapb.SegmentLevel_L0, InsertChannel: channel, }, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 8af689ea14..daae103916 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -13,8 +13,9 @@ import ( type CompactionTriggerType int8 const ( - TriggerTypeLevelZeroView CompactionTriggerType = iota + 1 - TriggerTypeSegmentSizeView + TriggerTypeLevelZeroViewChange CompactionTriggerType = iota + 1 + TriggerTypeLevelZeroViewIDLE + TriggerTypeSegmentSizeViewChange ) type TriggerManager interface { @@ -32,16 +33,14 @@ type TriggerManager interface { // 2. SystemIDLE & schedulerIDLE // 3. Manual Compaction type CompactionTriggerManager struct { - meta *meta scheduler Scheduler handler compactionPlanContext // TODO replace with scheduler allocator allocator } -func NewCompactionTriggerManager(meta *meta, alloc allocator, handler compactionPlanContext) *CompactionTriggerManager { +func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) *CompactionTriggerManager { m := &CompactionTriggerManager{ - meta: meta, allocator: alloc, handler: handler, } @@ -53,50 +52,69 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT log := log.With(zap.Int64("taskID", taskID)) for _, view := range views { switch eventType { - case TriggerTypeLevelZeroView: - log.Debug("Start to trigger a level zero compaction") + case TriggerTypeLevelZeroViewChange: + log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") + outView, reason := view.Trigger() + if outView != nil { + log.Info("Success to trigger a LevelZeroCompaction output view, try to sumit", + zap.String("reason", reason), + zap.String("output view", outView.String())) + m.SubmitL0ViewToScheduler(taskID, outView) + } + + case TriggerTypeLevelZeroViewIDLE: + log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") outView, reason := view.Trigger() if outView == nil { - continue + log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") + outView, reason = view.ForceTrigger() } - plan := m.BuildLevelZeroCompactionPlan(outView) - if plan == nil { - continue + if outView != nil { + log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", + zap.String("reason", reason), + zap.String("output view", outView.String())) + m.SubmitL0ViewToScheduler(taskID, outView) } - - label := outView.GetGroupLabel() - signal := &compactionSignal{ - id: taskID, - isForce: false, - isGlobal: true, - collectionID: label.CollectionID, - partitionID: label.PartitionID, - pos: outView.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, - } - - // TODO, remove handler, use scheduler - // m.scheduler.Submit(plan) - m.handler.execCompactionPlan(signal, plan) - log.Info("Finish to trigger a LevelZeroCompaction plan", - zap.Int64("planID", plan.GetPlanID()), - zap.String("type", plan.GetType().String()), - zap.String("reason", reason), - zap.String("output view", outView.String())) } } } -func (m *CompactionTriggerManager) BuildLevelZeroCompactionPlan(view CompactionView) *datapb.CompactionPlan { +func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView CompactionView) { + plan := m.buildL0CompactionPlan(outView) + if plan == nil { + return + } + + label := outView.GetGroupLabel() + signal := &compactionSignal{ + id: taskID, + isForce: false, + isGlobal: true, + collectionID: label.CollectionID, + partitionID: label.PartitionID, + pos: outView.(*LevelZeroSegmentsView).earliestGrowingSegmentPos, + } + + // TODO, remove handler, use scheduler + // m.scheduler.Submit(plan) + m.handler.execCompactionPlan(signal, plan) + log.Info("Finish to submit a LevelZeroCompaction plan", + zap.Int64("taskID", taskID), + zap.Int64("planID", plan.GetPlanID()), + zap.String("type", plan.GetType().String()), + ) +} + +func (m *CompactionTriggerManager) buildL0CompactionPlan(view CompactionView) *datapb.CompactionPlan { var segmentBinlogs []*datapb.CompactionSegmentBinlogs levelZeroSegs := lo.Map(view.GetSegmentsView(), func(segView *SegmentView, _ int) *datapb.CompactionSegmentBinlogs { - s := m.meta.GetSegment(segView.ID) return &datapb.CompactionSegmentBinlogs{ SegmentID: segView.ID, - Deltalogs: s.GetDeltalogs(), Level: datapb.SegmentLevel_L0, CollectionID: view.GetGroupLabel().CollectionID, PartitionID: view.GetGroupLabel().PartitionID, + // Deltalogs: deltalogs are filled before executing the plan } }) segmentBinlogs = append(segmentBinlogs, levelZeroSegs...) diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 34fc8c9efc..fadfdf9af7 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -22,6 +22,7 @@ type CompactionTriggerManagerSuite struct { mockAlloc *NMockAllocator mockPlanContext *MockCompactionPlanContext testLabel *CompactionGroupLabel + meta *meta m *CompactionTriggerManager } @@ -35,16 +36,67 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { PartitionID: 10, Channel: "ch-1", } - meta := &meta{segments: &SegmentsInfo{ + s.meta = &meta{segments: &SegmentsInfo{ segments: genSegmentsForMeta(s.testLabel), }} - s.m = NewCompactionTriggerManager(meta, s.mockAlloc, s.mockPlanContext) + s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext) } -func (s *CompactionTriggerManagerSuite) TestNotify() { - viewManager := NewCompactionViewManager(s.m.meta, s.m, s.m.allocator) - collSegs := s.m.meta.GetCompactableSegmentGroupByCollection() +func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { + viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) + collSegs := s.meta.GetCompactableSegmentGroupByCollection() + + segments, found := collSegs[1] + s.Require().True(found) + + seg1, found := lo.Find(segments, func(info *SegmentInfo) bool { + return info.ID == int64(100) && info.GetLevel() == datapb.SegmentLevel_L0 + }) + s.Require().True(found) + + // Prepare only 1 l0 segment that doesn't meet the Trigger minimum condition + // but ViewIDLE Trigger will still forceTrigger the plan + latestL0Segments := GetViewsByInfo(seg1) + expectedSegID := seg1.ID + + s.Require().Equal(1, len(latestL0Segments)) + levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + s.Require().Equal(1, len(levelZeroView)) + cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + s.True(ok) + s.NotNil(cView) + log.Info("view", zap.Any("cView", cView)) + + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything). + Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) { + s.EqualValues(19530, signal.id) + s.True(signal.isGlobal) + s.False(signal.isForce) + s.EqualValues(30000, signal.pos.GetTimestamp()) + s.Equal(s.testLabel.CollectionID, signal.collectionID) + s.Equal(s.testLabel.PartitionID, signal.partitionID) + + s.NotNil(plan) + s.Equal(s.testLabel.Channel, plan.GetChannel()) + s.Equal(datapb.CompactionType_Level0DeleteCompaction, plan.GetType()) + + expectedSegs := []int64{expectedSegID} + gotSegs := lo.Map(plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 { + return b.GetSegmentID() + }) + + s.ElementsMatch(expectedSegs, gotSegs) + log.Info("generated plan", zap.Any("plan", plan)) + }).Return(nil).Once() + + s.m.Notify(19530, TriggerTypeLevelZeroViewIDLE, levelZeroView) +} + +func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { + viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) + collSegs := s.meta.GetCompactableSegmentGroupByCollection() segments, found := collSegs[1] s.Require().True(found) @@ -85,5 +137,5 @@ func (s *CompactionTriggerManagerSuite) TestNotify() { log.Info("generated plan", zap.Any("plan", plan)) }).Return(nil).Once() - s.m.Notify(19530, TriggerTypeLevelZeroView, levelZeroView) + s.m.Notify(19530, TriggerTypeLevelZeroViewChange, levelZeroView) } diff --git a/internal/datacoord/compaction_view.go b/internal/datacoord/compaction_view.go index ad969e2c74..a523cb007b 100644 --- a/internal/datacoord/compaction_view.go +++ b/internal/datacoord/compaction_view.go @@ -18,6 +18,7 @@ type CompactionView interface { Append(segments ...*SegmentView) String() string Trigger() (CompactionView, string) + ForceTrigger() (CompactionView, string) } type FullViews struct { diff --git a/internal/datacoord/compaction_view_manager.go b/internal/datacoord/compaction_view_manager.go index 31934ff172..3b5d70d8de 100644 --- a/internal/datacoord/compaction_view_manager.go +++ b/internal/datacoord/compaction_view_manager.go @@ -6,11 +6,13 @@ import ( "time" "github.com/samber/lo" + "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CompactionViewManager struct { @@ -54,45 +56,98 @@ func (m *CompactionViewManager) checkLoop() { if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { return } + + // TODO: Only process L0 compaction now, so just return if its not enabled + if !Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { + return + } + interval := Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second) - ticker := time.NewTicker(interval) - defer ticker.Stop() + checkTicker := time.NewTicker(interval) + defer checkTicker.Stop() - log.Info("Compaction view manager start") + idleTicker := time.NewTicker(interval * 3) + defer idleTicker.Stop() + // each time when triggers a compaction, the idleTicker would reset + refreshViewsAndTrigger := func(ctx context.Context) bool { + events := m.Check(ctx) + if len(events) != 0 { + m.notifyTrigger(ctx, events) + idleTicker.Reset(interval * 3) + return true + } + + return false + } + + log.Info("Compaction view manager start", zap.Duration("check interval", interval), zap.Duration("idle check interval", interval*3)) for { select { case <-m.closeSig: log.Info("Compaction View checkLoop quit") return - case <-ticker.C: - m.Check() + case <-checkTicker.C: + refreshViewsAndTrigger(context.Background()) + + case <-idleTicker.C: + // idelTicker will be reset everytime when Check's able to + // generates compaction events + + // if no views are freshed, try to get cached views and trigger a + // TriggerTypeViewIDLE event + if !refreshViewsAndTrigger(context.Background()) { + m.triggerEventForIDLEView() + } } } } -// Global check could take some time, we need to record the time. -func (m *CompactionViewManager) Check() { - // Only process L0 compaction now, so just return if its not enabled - if !Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { - return +func (m *CompactionViewManager) triggerEventForIDLEView() { + log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") + events := make(map[CompactionTriggerType][]CompactionView) + for collID := range m.view.collections { + cachedViews := m.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { + return v.Level == datapb.SegmentLevel_L0 + }) + if len(cachedViews) > 0 { + grouped := m.groupL0ViewsByPartChan(collID, cachedViews) + events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped), + func(l0View *LevelZeroSegmentsView, _ int) CompactionView { + return l0View + }) + log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID)) + break + } } - ctx := context.TODO() + if len(events) > 0 { + m.notifyTrigger(context.Background(), events) + } +} + +func (m *CompactionViewManager) notifyTrigger(ctx context.Context, events map[CompactionTriggerType][]CompactionView) { taskID, err := m.allocator.allocID(ctx) if err != nil { - log.Warn("CompactionViewManager check failed, unable to allocate taskID", + log.Warn("CompactionViewManager notify trigger failed, unable to allocate taskID", zap.Error(err)) return } - log := log.With(zap.Int64("taskID", taskID)) + for eType, views := range events { + m.trigger.Notify(taskID, eType, views) + } +} + +// Global check could take some time, we need to record the time. +func (m *CompactionViewManager) Check(ctx context.Context) (events map[CompactionTriggerType][]CompactionView) { + _, span := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "CompactionView-Check") + defer span.End() m.viewGuard.Lock() defer m.viewGuard.Unlock() - events := make(map[CompactionTriggerType][]CompactionView) - + span.AddEvent("CompactionView GetCompactableSegment") latestCollSegs := m.meta.GetCompactableSegmentGroupByCollection() latestCollIDs := lo.Keys(latestCollSegs) viewCollIDs := lo.Keys(m.view.collections) @@ -103,6 +158,18 @@ func (m *CompactionViewManager) Check() { } // TODO: update all segments views. For now, just update Level Zero Segments + span.AddEvent("CompactionView Refresh L0 views") + refreshedL0Views := m.RefreshLevelZeroViews(latestCollSegs) + if len(refreshedL0Views) > 0 { + events = make(map[CompactionTriggerType][]CompactionView) + events[TriggerTypeLevelZeroViewChange] = refreshedL0Views + } + + return events +} + +func (m *CompactionViewManager) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView { + var refreshedL0Views []CompactionView for collID, segments := range latestCollSegs { levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 @@ -121,12 +188,10 @@ func (m *CompactionViewManager) Check() { }))) m.view.collections[collID] = latestL0Segments - events[TriggerTypeLevelZeroView] = changedL0Views + refreshedL0Views = append(refreshedL0Views, changedL0Views...) } - for eType, views := range events { - m.trigger.Notify(taskID, eType, views) - } + return refreshedL0Views } func (m *CompactionViewManager) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) []CompactionView { diff --git a/internal/datacoord/compaction_view_manager_test.go b/internal/datacoord/compaction_view_manager_test.go index 696dcab351..f3fac3d1fc 100644 --- a/internal/datacoord/compaction_view_manager_test.go +++ b/internal/datacoord/compaction_view_manager_test.go @@ -1,7 +1,9 @@ package datacoord import ( + "context" "testing" + "time" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -98,17 +100,91 @@ func (s *CompactionViewManagerSuite) TestCheckLoop() { s.m.Start() s.m.closeWg.Wait() }) + + s.Run("Test not enable levelZero segment", func() { + paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "false") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) + + s.m.Start() + s.m.closeWg.Wait() + }) } -func (s *CompactionViewManagerSuite) TestCheck() { +func (s *CompactionViewManagerSuite) TestCheckLoopIDLETicker() { + paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1") + defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key) paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true") defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Times(2) + events := s.m.Check(context.Background()) + s.NotEmpty(events) + s.Require().NotEmpty(s.m.view.collections) + + notified := make(chan struct{}) + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() + s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). + Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { + s.Equal(TriggerTypeLevelZeroViewIDLE, tType) + v, ok := views[0].(*LevelZeroSegmentsView) + s.True(ok) + s.NotNil(v) + log.Info("All views", zap.String("l0 view", v.String())) + + notified <- struct{}{} + }).Once() + + s.m.Start() + <-notified + s.m.Close() +} + +func (s *CompactionViewManagerSuite) TestCheckLoopRefreshViews() { + paramtable.Get().Save(Params.DataCoordCfg.GlobalCompactionInterval.Key, "0.1") + defer paramtable.Get().Reset(Params.DataCoordCfg.GlobalCompactionInterval.Key) + paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "true") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) + + s.Require().Empty(s.m.view.collections) + + notified := make(chan struct{}) + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() + s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). + Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { + s.Equal(TriggerTypeLevelZeroViewChange, tType) + v, ok := views[0].(*LevelZeroSegmentsView) + s.True(ok) + s.NotNil(v) + log.Info("All views", zap.String("l0 view", v.String())) + + notified <- struct{}{} + }).Once() + + s.m.Start() + <-notified + + // clear view + s.m.viewGuard.Lock() + s.m.view.collections = make(map[int64][]*SegmentView) + s.m.viewGuard.Unlock() + + // clear meta + s.m.meta.Lock() + s.m.meta.segments.segments = make(map[int64]*SegmentInfo) + s.m.meta.Unlock() + + <-time.After(time.Second) + s.m.Close() +} + +func (s *CompactionViewManagerSuite) TestTriggerEventForIDLEView() { + s.Require().Empty(s.m.view.collections) + s.m.triggerEventForIDLEView() + + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { s.EqualValues(1, taskID) - s.Equal(TriggerTypeLevelZeroView, tType) + s.Equal(TriggerTypeLevelZeroViewIDLE, tType) s.Equal(1, len(views)) v, ok := views[0].(*LevelZeroSegmentsView) s.True(ok) @@ -122,9 +198,43 @@ func (s *CompactionViewManagerSuite) TestCheck() { log.Info("All views", zap.String("l0 view", v.String())) }).Once() + events := s.m.Check(context.Background()) + s.NotEmpty(events) + s.Require().NotEmpty(s.m.view.collections) + s.m.triggerEventForIDLEView() +} + +func (s *CompactionViewManagerSuite) TestNotifyTrigger() { + s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil).Once() + s.mockTriggerManager.EXPECT().Notify(mock.Anything, mock.Anything, mock.Anything). + Run(func(taskID UniqueID, tType CompactionTriggerType, views []CompactionView) { + s.EqualValues(1, taskID) + s.Equal(TriggerTypeLevelZeroViewChange, tType) + s.Equal(1, len(views)) + v, ok := views[0].(*LevelZeroSegmentsView) + s.True(ok) + s.NotNil(v) + + expectedSegs := []int64{100, 101, 102, 103} + gotSegs := lo.Map(v.segments, func(s *SegmentView, _ int) int64 { return s.ID }) + s.ElementsMatch(expectedSegs, gotSegs) + + s.EqualValues(30000, v.earliestGrowingSegmentPos.GetTimestamp()) + log.Info("All views", zap.String("l0 view", v.String())) + }).Once() + + ctx := context.Background() + s.Require().Empty(s.m.view.collections) + events := s.m.Check(ctx) + + s.m.notifyTrigger(ctx, events) +} + +func (s *CompactionViewManagerSuite) TestCheck() { // nothing in the view before the test + ctx := context.Background() s.Empty(s.m.view.collections) - s.m.Check() + events := s.m.Check(ctx) s.m.viewGuard.Lock() views := s.m.view.GetSegmentViewBy(s.testLabel.CollectionID, nil) @@ -138,11 +248,22 @@ func (s *CompactionViewManagerSuite) TestCheck() { log.Info("LevelZeroString", zap.String("segment", view.LevelZeroString())) } + s.NotEmpty(events) + s.Equal(1, len(events)) + refreshed, ok := events[TriggerTypeLevelZeroViewChange] + s.Require().True(ok) + s.Equal(1, len(refreshed)) + + // same meta + emptyEvents := s.m.Check(ctx) + s.Empty(emptyEvents) + // clear meta s.m.meta.Lock() s.m.meta.segments.segments = make(map[int64]*SegmentInfo) s.m.meta.Unlock() - s.m.Check() + emptyEvents = s.m.Check(ctx) + s.Empty(emptyEvents) s.Empty(s.m.view.collections) } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b1f6687334..99b4c76ada 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -499,7 +499,7 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ func (s *Server) createCompactionHandler() { s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator) - triggerv2 := NewCompactionTriggerManager(s.meta, s.allocator, s.compactionHandler) + triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler) s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator) }