From a1386bae7f2d41ef647957dfbe41d2fcc8a4bc7f Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 15 Mar 2024 10:21:12 +0800 Subject: [PATCH] fix: Skip to submit l0 tasks when scheduler full (#31270) See also: #31242 Signed-off-by: yangxuan --- internal/datacoord/compaction_trigger_v2.go | 5 ++++ .../datacoord/compaction_trigger_v2_test.go | 29 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index daae103916..1ba9c1d9ef 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -51,6 +51,11 @@ func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) { log := log.With(zap.Int64("taskID", taskID)) for _, view := range views { + if m.handler.isFull() { + log.RatedInfo(1.0, "Skip trigger compaction for scheduler is full") + return + } + switch eventType { case TriggerTypeLevelZeroViewChange: log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 1d3ad32ed3..7282c2c3f0 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -43,6 +43,33 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext) } +func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() { + s.mockPlanContext.EXPECT().isFull().Return(true) + viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) + collSegs := s.meta.GetCompactableSegmentGroupByCollection() + + segments, found := collSegs[1] + s.Require().True(found) + + levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { + return info.GetLevel() == datapb.SegmentLevel_L0 + }) + + latestL0Segments := GetViewsByInfo(levelZeroSegments...) + s.Require().NotEmpty(latestL0Segments) + needRefresh, levelZeroView := viewManager.getChangedLevelZeroViews(1, latestL0Segments) + s.Require().True(needRefresh) + s.Require().Equal(1, len(levelZeroView)) + cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + s.True(ok) + s.NotNil(cView) + log.Info("view", zap.Any("cView", cView)) + + // s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockPlanContext.EXPECT().isFull().Return(false) + s.m.Notify(19530, TriggerTypeLevelZeroViewChange, levelZeroView) +} + func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) collSegs := s.meta.GetCompactableSegmentGroupByCollection() @@ -70,6 +97,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { log.Info("view", zap.Any("cView", cView)) s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockPlanContext.EXPECT().isFull().Return(false) s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything). Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) { s.EqualValues(19530, signal.id) @@ -117,6 +145,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { log.Info("view", zap.Any("cView", cView)) s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) + s.mockPlanContext.EXPECT().isFull().Return(false) s.mockPlanContext.EXPECT().execCompactionPlan(mock.Anything, mock.Anything). Run(func(signal *compactionSignal, plan *datapb.CompactionPlan) { s.EqualValues(19530, signal.id)