From afaabc2a38fca9b7820633c62663eb89ce02a85d Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 24 Dec 2024 15:34:50 +0800 Subject: [PATCH] enhance: [2.4] clean compaction task in compactionHandler (#38170) (#38584) issue: #35711 master pr: #38170 Signed-off-by: wayblink Signed-off-by: Cai Zhang Co-authored-by: wayblink --- internal/datacoord/compaction.go | 47 ++++- internal/datacoord/compaction_task.go | 10 + .../datacoord/compaction_task_clustering.go | 187 +++++++++++------- .../compaction_task_clustering_test.go | 106 +++++++--- internal/datacoord/compaction_task_l0.go | 28 +-- internal/datacoord/compaction_task_l0_test.go | 28 +-- internal/datacoord/compaction_task_mix.go | 43 ++-- internal/datacoord/compaction_test.go | 182 +++++++++++++++++ internal/datacoord/meta.go | 6 + internal/datacoord/partition_stats_meta.go | 30 ++- .../datacoord/partition_stats_meta_test.go | 58 ++++++ pkg/util/merr/errors.go | 1 + pkg/util/merr/utils.go | 8 + 13 files changed, 582 insertions(+), 152 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index a25d45a501..4c5fa24698 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -83,6 +83,9 @@ type compactionPlanHandler struct { executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task + cleaningGuard lock.RWMutex + cleaningTasks map[int64]CompactionTask // planID -> task + meta CompactionMeta allocator allocator chManager ChannelManager @@ -191,6 +194,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann stopCh: make(chan struct{}), cluster: cluster, executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), analyzeScheduler: analyzeScheduler, handler: handler, } @@ -413,6 +417,7 @@ func (c *compactionPlanHandler) loopCheck() { if err != nil { log.Info("fail to update compaction", zap.Error(err)) } + c.cleanFailedTasks() } } } @@ -444,7 +449,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { triggers := c.meta.GetCompactionTasks() for _, tasks := range triggers { for _, task := range tasks { - if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned { + if task.State == datapb.CompactionTaskState_cleaned { duration := time.Since(time.Unix(task.StartTime, 0)).Seconds() if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) { // try best to delete meta @@ -670,6 +675,11 @@ func assignNodeID(slots map[int64]int64, t CompactionTask) int64 { return nodeID } +// checkCompaction retrieves executing tasks and calls each task's Process() method +// to evaluate its state and progress through the state machine. +// Completed tasks are removed from executingTasks. +// Tasks that fail or timeout are moved from executingTasks to cleaningTasks, +// where task-specific clean logic is performed asynchronously. func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -711,9 +721,44 @@ func (c *compactionPlanHandler) checkCompaction() error { metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc() } c.executingGuard.Unlock() + + // insert task need to clean + c.cleaningGuard.Lock() + for _, t := range finishedTasks { + if t.GetState() == datapb.CompactionTaskState_failed || + t.GetState() == datapb.CompactionTaskState_timeout || + t.GetState() == datapb.CompactionTaskState_completed { + log.Ctx(context.TODO()).Info("task need to clean", + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID()), + zap.String("state", t.GetState().String())) + c.cleaningTasks[t.GetPlanID()] = t + } + } + c.cleaningGuard.Unlock() + return nil } +// cleanFailedTasks performs task define Clean logic +// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks +func (c *compactionPlanHandler) cleanFailedTasks() { + c.cleaningGuard.RLock() + cleanedTasks := make([]CompactionTask, 0) + for _, t := range c.cleaningTasks { + clean := t.Clean() + if clean { + cleanedTasks = append(cleanedTasks, t) + } + } + c.cleaningGuard.RUnlock() + c.cleaningGuard.Lock() + for _, t := range cleanedTasks { + delete(c.cleaningTasks, t.GetPlanID()) + } + c.cleaningGuard.Unlock() +} + func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 6cfdcb9af8..7e6f6ab2ba 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -24,7 +24,17 @@ import ( ) type CompactionTask interface { + // Process performs the task's state machine + // + // Returns: + // - : whether the task state machine ends. + // + // Notes: + // + // `end` doesn't mean the task completed, its state may be completed or failed or timeout. Process() bool + // Clean performs clean logic for a fail/timeout task + Clean() bool BuildCompactionRequest() (*datapb.CompactionPlan, error) GetTriggerID() UniqueID diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 62417fc826..a85a2a1d42 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -65,8 +65,11 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, meta CompactionMeta, } } +// Note: return True means exit this state machine. +// ONLY return True for Completed, Failed or Timeout func (t *clusteringCompactionTask) Process() bool { - log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) + ctx := context.TODO() + log := log.Ctx(ctx).With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) lastState := t.GetState().String() err := t.retryableProcess() if err != nil { @@ -105,15 +108,22 @@ func (t *clusteringCompactionTask) Process() bool { if err != nil { log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) } + log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) - return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned + return t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout } // retryableProcess process task's state transfer, return error if not work as expected // the outer Process will set state and retry times according to the error type(retryable or not-retryable) func (t *clusteringCompactionTask) retryableProcess() error { - if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned { + if t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout { return nil } @@ -140,14 +150,14 @@ func (t *clusteringCompactionTask) retryableProcess() error { return t.processMetaSaved() case datapb.CompactionTaskState_indexing: return t.processIndexing() - case datapb.CompactionTaskState_timeout: - return t.processFailedOrTimeout() - case datapb.CompactionTaskState_failed: - return t.processFailedOrTimeout() } return nil } +func (t *clusteringCompactionTask) Clean() bool { + return t.doClean() == nil +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { plan := &datapb.CompactionPlan{ PlanID: t.GetPlanID(), @@ -188,7 +198,9 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP } func (t *clusteringCompactionTask) processPipelining() error { - log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID())) + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) if t.NeedReAssignNodeID() { log.Debug("wait for the node to be assigned before proceeding with the subsequent steps") return nil @@ -210,7 +222,9 @@ func (t *clusteringCompactionTask) processPipelining() error { } func (t *clusteringCompactionTask) processExecuting() error { - log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) if err != nil || result == nil { log.Info("processExecuting clustering compaction", zap.Bool("result nil", result == nil), zap.Error(err)) @@ -248,12 +262,7 @@ func (t *clusteringCompactionTask) processExecuting() error { return t.processMetaSaved() case datapb.CompactionTaskState_executing: if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processFailedOrTimeout() - } else { - return err - } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) } return nil case datapb.CompactionTaskState_failed: @@ -265,6 +274,9 @@ func (t *clusteringCompactionTask) processExecuting() error { } func (t *clusteringCompactionTask) processMetaSaved() error { + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { @@ -274,6 +286,9 @@ func (t *clusteringCompactionTask) processMetaSaved() error { } func (t *clusteringCompactionTask) processIndexing() error { + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) // wait for segment indexed collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "") if len(collectionIndexes) == 0 { @@ -292,7 +307,8 @@ func (t *clusteringCompactionTask) processIndexing() error { } return true }() - log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments)) + log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), + zap.Int64s("segments", t.ResultSegments)) if indexed { return t.completeTask() } @@ -316,6 +332,9 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error { // indexed is the final state of a clustering compaction task // one task should only run this once func (t *clusteringCompactionTask) completeTask() error { + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) var err error // update current partition stats version // at this point, the segment view includes both the input segments and the result segments. @@ -335,17 +354,23 @@ func (t *clusteringCompactionTask) completeTask() error { return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err) } + if err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { + log.Warn("completeTask update task state to completed failed", zap.Error(err)) + return err + } // mark input segments as dropped // now, the segment view only includes the result segments. if err = t.markInputSegmentsDropped(); err != nil { - log.Warn("mark input segments as Dropped failed, skip it and wait retry", - zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + log.Warn("mark input segments as Dropped failed, skip it and wait retry") } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) + return nil } func (t *clusteringCompactionTask) processAnalyzing() error { + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID()) if analyzeTask == nil { log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID())) @@ -373,74 +398,92 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } -func (t *clusteringCompactionTask) processFailedOrTimeout() error { - log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String())) +func (t *clusteringCompactionTask) doClean() error { + log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID), + zap.Int64("collectionID", t.GetCollectionID()), + zap.Int64("planID", t.GetPlanID())) + log.Info("clean task", zap.String("state", t.GetState().String())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } - isInputDropped := false - for _, segID := range t.GetInputSegments() { - if t.meta.GetHealthySegment(segID) == nil { - isInputDropped = true - break - } - } - if isInputDropped { - log.Info("input segments dropped, doing for compatibility", - zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) - // this task must be generated by v2.4, just for compatibility - // revert segments meta - var operators []UpdateOperator - // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 - for _, segID := range t.GetInputSegments() { - operators = append(operators, RevertSegmentLevelOperator(segID)) - } - // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats - for _, segID := range t.GetResultSegments() { - operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) - operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) - } - err := t.meta.UpdateSegmentsInfo(operators...) - if err != nil { - log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + + if t.GetState() == datapb.CompactionTaskState_completed { + if err := t.markInputSegmentsDropped(); err != nil { + return err } } else { - // after v2.4.16, mark the results segment as dropped - var operators []UpdateOperator - for _, segID := range t.GetResultSegments() { - // Don't worry about them being loaded; they are all invisible. - operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + isInputDropped := false + for _, segID := range t.GetInputSegments() { + if t.meta.GetHealthySegment(segID) == nil { + isInputDropped = true + break + } } + if isInputDropped { + log.Info("input segments dropped, doing for compatibility", + zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) + // this task must be generated by v2.4, just for compatibility + // revert segments meta + var operators []UpdateOperator + // revert level of input segments + // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 + // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + for _, segID := range t.GetInputSegments() { + operators = append(operators, RevertSegmentLevelOperator(segID)) + } + // if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats + for _, segID := range t.GetResultSegments() { + operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1)) + operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0)) + } + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } + } else { + // after v2.4.16, mark the results segment as dropped + var operators []UpdateOperator + for _, segID := range t.GetResultSegments() { + // Don't worry about them being loaded; they are all invisible. + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } - err := t.meta.UpdateSegmentsInfo(operators...) + err := t.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + } + } + // drop partition stats if uploaded + partitionStatsInfo := &datapb.PartitionStatsInfo{ + CollectionID: t.GetCollectionID(), + PartitionID: t.GetPartitionID(), + VChannel: t.GetChannel(), + Version: t.GetPlanID(), + SegmentIDs: t.GetResultSegments(), + } + err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo) if err != nil { - log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) + log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())) } } - t.resetSegmentCompacting() - - // drop partition stats if uploaded - partitionStatsInfo := &datapb.PartitionStatsInfo{ - CollectionID: t.GetCollectionID(), - PartitionID: t.GetPartitionID(), - VChannel: t.GetChannel(), - Version: t.GetPlanID(), - SegmentIDs: t.GetResultSegments(), - } - err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("clusteringCompactionTask clean done") + return nil } func (t *clusteringCompactionTask) doAnalyze() error { @@ -535,8 +578,8 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO log.Warn("Failed to saveTaskMeta", zap.Error(err)) return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable } - t.CompactionTask = task - log.Info("updateAndSaveTaskMeta success", zap.String("task state", t.CompactionTask.State.String())) + t.SetTask(task) + log.Ctx(context.TODO()).Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetState().String())) return nil } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index a9a4f16ff5..207eb82097 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang task := s.generateBasicTask(false) - task.processPipelining() + err := task.processPipelining() + s.NoError(err) seg11 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg11.Level) @@ -117,13 +118,14 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) - task.ResultSegments = []int64{103, 104} + task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104})) // fake some compaction result segment s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 103, State: commonpb.SegmentState_Flushed, Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, CreatedByCompaction: true, PartitionStatsVersion: 10001, }, @@ -133,27 +135,80 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang ID: 104, State: commonpb.SegmentState_Flushed, Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, CreatedByCompaction: true, PartitionStatsVersion: 10001, }, }) - task.processFailedOrTimeout() + s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) - seg12 := s.meta.GetSegment(101) - s.Equal(datapb.SegmentLevel_L1, seg12.Level) - seg22 := s.meta.GetSegment(102) - s.Equal(datapb.SegmentLevel_L2, seg22.Level) - s.Equal(int64(10000), seg22.PartitionStatsVersion) + err = task.doClean() + s.NoError(err) - seg32 := s.meta.GetSegment(103) - s.Equal(datapb.SegmentLevel_L2, seg32.Level) - s.Equal(int64(10001), seg32.PartitionStatsVersion) - s.Equal(commonpb.SegmentState_Dropped, seg32.GetState()) - seg42 := s.meta.GetSegment(104) - s.Equal(datapb.SegmentLevel_L2, seg42.Level) - s.Equal(int64(10001), seg42.PartitionStatsVersion) - s.Equal(commonpb.SegmentState_Dropped, seg42.GetState()) + s.Run("v2.4.x", func() { + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Dropped, + LastLevel: datapb.SegmentLevel_L1, + Level: datapb.SegmentLevel_L2, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Dropped, + LastLevel: datapb.SegmentLevel_L2, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + + task := s.generateBasicTask(false) + task.sessions = s.session + s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + task.InputSegments = []int64{101, 102} + task.ResultSegments = []int64{103, 104} + + task.Clean() + + seg12 := s.meta.GetSegment(101) + s.Equal(datapb.SegmentLevel_L1, seg12.Level) + seg22 := s.meta.GetSegment(102) + s.Equal(datapb.SegmentLevel_L2, seg22.Level) + s.Equal(int64(10000), seg22.PartitionStatsVersion) + + seg32 := s.meta.GetSegment(103) + s.Equal(datapb.SegmentLevel_L1, seg32.Level) + s.Equal(int64(0), seg32.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Flushed, seg32.GetState()) + seg42 := s.meta.GetSegment(104) + s.Equal(datapb.SegmentLevel_L1, seg42.Level) + s.Equal(int64(0), seg42.PartitionStatsVersion) + s.Equal(commonpb.SegmentState_Flushed, seg42.GetState()) + }) } func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask { @@ -208,7 +263,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { s.Equal(false, task.Process()) s.Equal(int32(3), task.RetryTimes) s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) - s.Equal(false, task.Process()) + s.True(task.Process()) s.Equal(int32(0), task.RetryTimes) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) } @@ -216,8 +271,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() { s.Run("process pipelining fail, segment not found", func() { task := s.generateBasicTask(false) - task.State = datapb.CompactionTaskState_pipelining - s.Equal(false, task.Process()) + task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining)) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -444,11 +499,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() - s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() time.Sleep(time.Second * 1) - s.Equal(true, task.Process()) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetState()) + s.True(task.Process()) + s.Equal(datapb.CompactionTaskState_timeout, task.GetState()) }) } @@ -546,8 +600,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { s.Run("analyze task not found", func() { task := s.generateBasicTask(false) - task.State = datapb.CompactionTaskState_analyzing - s.False(task.Process()) + task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -564,7 +618,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { State: indexpb.JobState_JobStateFailed, } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -582,7 +636,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { CentroidsFile: "", } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 358b0e630c..50b10c6111 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -44,7 +44,7 @@ type l0CompactionTask struct { } // Note: return True means exit this state machine. -// ONLY return True for processCompleted or processFailed +// ONLY return True for Completed, Failed func (t *l0CompactionTask) Process() bool { switch t.GetState() { case datapb.CompactionTaskState_pipelining: @@ -55,8 +55,6 @@ func (t *l0CompactionTask) Process() bool { return t.processMetaSaved() case datapb.CompactionTaskState_completed: return t.processCompleted() - case datapb.CompactionTaskState_failed: - return t.processFailed() } return true } @@ -77,7 +75,7 @@ func (t *l0CompactionTask) processPipelining() bool { return false } - return t.processFailed() + return true } err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) @@ -119,7 +117,7 @@ func (t *l0CompactionTask) processExecuting() bool { log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err)) return false } - return t.processFailed() + return true } return false } @@ -149,25 +147,33 @@ func (t *l0CompactionTask) processCompleted() bool { return true } -func (t *l0CompactionTask) processFailed() bool { +func (t *l0CompactionTask) doClean() error { + log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }) if err != nil { - log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Error(err)) } } - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + return err } - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("l0CompactionTask clean done") + return nil +} + +func (t *l0CompactionTask) Clean() bool { + return t.doClean() == nil } func (t *l0CompactionTask) GetSpan() trace.Span { diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index c522798f1a..4cc134489c 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -363,14 +363,11 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { PlanID: t.GetPlanID(), State: datapb.CompactionTaskState_failed, }, nil).Once() - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil) - - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) }) s.Run("test executing with result failed save compaction meta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_executing) @@ -453,29 +450,10 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetInputSegments()) - }).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) - }) - s.Run("test process failed failed", func() { - t := s.generateTestL0Task(datapb.CompactionTaskState_failed) - t.NodeID = 100 - s.Require().True(t.GetNodeID() > 0) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetInputSegments()) - }).Once() - - got := t.Process() - s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) }) s.Run("test unkonwn task", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 3085d4e7e8..a6bb59ef0a 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -42,7 +42,7 @@ func (t *mixCompactionTask) processPipelining() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processFailed() + return true } err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) @@ -50,15 +50,18 @@ func (t *mixCompactionTask) processPipelining() bool { // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset // to enable a retry in compaction.checkCompaction(). // This is tricky, we should remove the reassignment here. - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } return false } log.Info("mixCompactionTask notify compaction tasks to DataNode") err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) if err != nil { - log.Warn("mixCompactionTask update task state failed", zap.Error(err)) + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } return false @@ -96,7 +99,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) @@ -106,7 +109,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } return false } @@ -122,7 +125,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } - return false + return true } return false } @@ -150,7 +153,7 @@ func (t *mixCompactionTask) saveSegmentMeta() error { } // Note: return True means exit this state machine. -// ONLY return True for processCompleted or processFailed +// ONLY return True for Completed, Failed or Timeout func (t *mixCompactionTask) Process() bool { log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) lastState := t.GetState().String() @@ -245,21 +248,33 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa } func (t *mixCompactionTask) processFailed() bool { - log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) + return true +} + +func (t *mixCompactionTask) Clean() bool { + return t.doClean() == nil +} + +func (t *mixCompactionTask) doClean() error { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), + zap.Int64("collectionID", t.GetCollectionID())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err)) + return err } - log.Info("mixCompactionTask processFailed done") - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 5dcd27e5d9..7ef96a16cf 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -46,6 +47,7 @@ type CompactionPlanHandlerSuite struct { mockCm *MockChannelManager mockSessMgr *MockSessionManager handler *compactionPlanHandler + mockHandler *NMockHandler cluster *MockCluster } @@ -57,6 +59,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockSessMgr = NewMockSessionManager(s.T()) s.cluster = NewMockCluster(s.T()) s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) + s.mockHandler = NewNMockHandler(s.T()) + s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() } func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() { @@ -1045,6 +1049,184 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.NoError(err) } +func (s *CompactionPlanHandlerSuite) TestCleanCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + meta: s.mockMeta, + sessions: s.mockSessMgr, + }, + }, + { + &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + meta: s.mockMeta, + sessions: s.mockSessMgr, + }, + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + err := s.handler.checkCompaction() + s.NoError(err) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() { + s.SetupTest() + + task := newClusteringCompactionTask( + &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() { + s.SetupTest() + + task := newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Channel: "ch-1", + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_executing, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + ClusteringKeyField: &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + }, + }, + s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return( + &datapb.CompactionPlanResult{ + PlanID: 1, + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + PlanID: 1, + SegmentID: 101, + }, + }, + }, nil).Once() + s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error")) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(task.GetResultSegments())) + + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +// test compactionHandler should keep clean the failed task until it become cleaned +func (s *CompactionPlanHandlerSuite) TestKeepClean() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + s.mockMeta, s.mockSessMgr, s.mockHandler, nil), + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(1, len(s.handler.cleaningTasks)) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil).Once() + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: fieldID, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ba6f8bafe1..1f1fe11656 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -774,6 +774,12 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO return false } + if segment.GetState() == status { + log.Ctx(context.TODO()).Info("meta update: segment stats already is target state", + zap.Int64("segmentID", segmentID), zap.String("status", status.String())) + return false + } + updateSegStateAndPrepareMetrics(segment, status, modPack.metricMutation) if status == commonpb.SegmentState_Dropped { segment.DroppedAt = uint64(time.Now().UnixNano()) diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index f379afe67f..574b85b0d8 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -107,8 +107,8 @@ func (psm *partitionStatsMeta) ListPartitionStatsInfos(collectionID int64, parti func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStatsInfo) error { psm.Lock() defer psm.Unlock() - if err := psm.catalog.SavePartitionStatsInfo(psm.ctx, info); err != nil { - log.Error("meta update: update PartitionStatsInfo info fail", zap.Error(err)) + if err := psm.catalog.SavePartitionStatsInfo(context.TODO(), info); err != nil { + log.Ctx(context.TODO()).Error("meta update: update PartitionStatsInfo info fail", zap.Error(err)) return err } if _, ok := psm.partitionStatsInfos[info.GetVChannel()]; !ok { @@ -127,6 +127,24 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStatsInfo) error { psm.Lock() defer psm.Unlock() + // if the dropping partitionStats is the current version, should update currentPartitionStats + currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion { + infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos + if len(infos) > 0 { + var maxVersion int64 = 0 + for version := range infos { + if version > maxVersion && version < currentVersion { + maxVersion = version + } + } + err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion) + if err != nil { + return err + } + } + } + if err := psm.catalog.DropPartitionStatsInfo(psm.ctx, info); err != nil { log.Error("meta update: drop PartitionStatsInfo info fail", zap.Int64("collectionID", info.GetCollectionID()), @@ -155,8 +173,11 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStat func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { psm.Lock() defer psm.Unlock() + return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion) +} - log.Info("update current partition stats version", zap.Int64("collectionID", collectionID), +func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { + log.Ctx(context.TODO()).Info("update current partition stats version", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion)) @@ -180,7 +201,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { psm.RLock() defer psm.RUnlock() + return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel) +} +func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { if _, ok := psm.partitionStatsInfos[vChannel]; !ok { return emptyPartitionStatsVersion } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 0c67f2d442..90aca1b922 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -91,3 +91,61 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") s.Equal(int64(100), currentVersion4) } + +func (s *PartitionStatsMetaSuite) TestDropPartitionStats() { + ctx := context.Background() + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + collectionID := int64(1) + partitionID := int64(2) + channel := "ch-1" + s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + partitionStats := []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 100, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 101, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 102, + }, + } + for _, partitionStats := range partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102) + version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(102), version) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[2]) + s.NoError(err) + s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(101), version2) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[1]) + s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(100), version3) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[0]) + s.NoError(err) + s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID]) + version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(emptyPartitionStatsVersion, version4) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index d47fe7399a..65395c1816 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -210,6 +210,7 @@ var ( ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true) ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 4bf2f4a729..f7954d0d85 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1179,6 +1179,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) } +func WrapErrCleanPartitionStatsFail(msg ...string) error { + err := error(ErrCleanPartitionStatsFail) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrAnalyzeTaskNotFound(id int64) error { return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id)) }