diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 4f7c9b5cc5..935acd45a6 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -75,12 +75,12 @@ func (t *clusteringCompactionTask) Process() bool { currentState := t.State.String() if currentState != lastState { ts := time.Now().UnixMilli() - t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts)) lastStateDuration := ts - t.GetLastStateStartTime() log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration)) + t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts)) if t.State == datapb.CompactionTaskState_completed { t.updateAndSaveTaskMeta(setEndTime(ts)) @@ -367,7 +367,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo) if err != nil { log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("CleanPartitionStatsInfo", err) } t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) @@ -469,6 +468,7 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap PreferSegmentRows: t.GetPreferSegmentRows(), AnalyzeTaskID: t.GetAnalyzeTaskID(), AnalyzeVersion: t.GetAnalyzeVersion(), + LastStateStartTime: t.GetLastStateStartTime(), } for _, opt := range opts { opt(taskClone) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 92fe48c5ed..358ab2b596 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -171,11 +171,6 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { for _, view := range views { - if m.compactionHandler.isFull() { - log.RatedInfo(10, "Skip trigger compaction for scheduler is full") - return - } - switch eventType { case TriggerTypeLevelZeroViewChange: log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index a679b859ff..dabd84b621 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -48,32 +48,6 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, s.meta) } -func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() { - s.mockPlanContext.EXPECT().isFull().Return(true) - collSegs := s.meta.GetCompactableSegmentGroupByCollection() - segments, found := collSegs[1] - s.Require().True(found) - - levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { - return info.GetLevel() == datapb.SegmentLevel_L0 - }) - - latestL0Segments := GetViewsByInfo(levelZeroSegments...) - s.Require().NotEmpty(latestL0Segments) - needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) - s.Require().True(needRefresh) - s.Require().Equal(1, len(levelZeroView)) - cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) - s.True(ok) - s.NotNil(cView) - log.Info("view", zap.Any("cView", cView)) - - // s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) - s.mockPlanContext.EXPECT().isFull().Return(false) - s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) -} - func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { handler := NewNMockHandler(s.T()) handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil) @@ -104,7 +78,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { log.Info("view", zap.Any("cView", cView)) s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) - s.mockPlanContext.EXPECT().isFull().Return(false) s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) @@ -149,7 +122,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { log.Info("view", zap.Any("cView", cView)) s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil) - s.mockPlanContext.EXPECT().isFull().Return(false) s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID())