diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index b58e7a6a25..35bf82a78a 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -293,6 +293,11 @@ func (t *clusteringCompactionTask) processExecuting() error { } func (t *clusteringCompactionTask) processMetaSaved() error { + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }); err != nil { + log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic)) } @@ -474,6 +479,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { func (t *clusteringCompactionTask) processFailedOrTimeout() error { log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String())) + + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }); err != nil { + log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } + // revert segments meta var operators []UpdateOperator // revert level of input segments diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index f3cfdfaa57..7e215ef362 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -107,6 +107,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang }, }) s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) task := s.generateBasicTask(false) @@ -372,6 +373,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() s.Equal(false, task.Process()) s.Equal(datapb.CompactionTaskState_statistic, task.GetState()) }) @@ -405,6 +407,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() + // DropCompactionPlan fail + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(merr.WrapErrNodeNotFound(1)).Once() s.Equal(false, task.Process()) s.Equal(datapb.CompactionTaskState_statistic, task.GetState()) }) @@ -440,6 +444,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() + time.Sleep(time.Second * 1) s.Equal(true, task.Process()) s.Equal(datapb.CompactionTaskState_cleaned, task.GetState()) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 830800809b..da89aa261d 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -297,6 +297,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). Observe(float64(t.tr.ElapseSpan().Milliseconds())) log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load())) + // clear the buffer cache + t.keyToBufferFunc = nil return planResult, nil }