From 8d12bfb4366985638f471026683a9ab599ad41a1 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 26 Dec 2025 18:57:19 +0800 Subject: [PATCH] fix: Restore the compaction task correctly to ensure it can be properly cleaned up (#46577) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #46576 - Core invariant: During meta load, only tasks that are truly terminal-cleaned (states cleaned or unknown) should be dropped; all other non-terminal tasks (including timeout and completed) must be restored so the inspector can reattach them to executing/cleaning queues and finish their cleanup lifecycle. - Removed/simplified logic: loadMeta no longer uses the broad isCompactionTaskFinished predicate (which treated timeout, completed, cleaned, unknown as terminal). It now uses the new isCompactionTaskCleaned predicate that only treats cleaned/unknown as terminal. This removes the redundant exclusion of timeout/completed tasks and simplifies the guard to drop only cleaned/unknown tasks. - Bug fix (root cause & exact change): Fixes issue #46576 — the previous isCompactionTaskFinished caused timeout/completed tasks to be skipped during meta load and thus not passed into restoreTask(). The PR adds isCompactionTaskCleaned and replaces the finished check so timeout and completed tasks are included in restoreTask() and re-attached to the inspector’s existing executing/cleaning queues. - No data loss or regression: Tasks in cleaned/unknown remain dropped (isCompactionTaskCleaned still returns true for cleaned/unknown). Non-terminal timeout/completed tasks now follow the same restoreTask() control path used previously for restored tasks — they are enqueued into the inspector’s queue/executing/cleaning flows rather than being discarded. No exported signatures changed and all restored tasks flow into existing handlers, avoiding behavior regression or data loss. Signed-off-by: Cai Zhang --- internal/datacoord/compaction_inspector.go | 2 +- internal/datacoord/compaction_util.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index 448df201b7..9e217e23ed 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -309,7 +309,7 @@ func (c *compactionInspector) loadMeta() { triggers := c.meta.GetCompactionTasks(context.TODO()) for _, tasks := range triggers { for _, task := range tasks { - if isCompactionTaskFinished(task) { + if isCompactionTaskCleaned(task) { log.Info("compactionInspector loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), diff --git a/internal/datacoord/compaction_util.go b/internal/datacoord/compaction_util.go index f0ceadf415..da9517d075 100644 --- a/internal/datacoord/compaction_util.go +++ b/internal/datacoord/compaction_util.go @@ -124,3 +124,15 @@ func isCompactionTaskFinished(t *datapb.CompactionTask) bool { return false } } + +// isCompactionTaskCleaned returns true if the task has been cleaned +// (cleaned, or unknown) and requires no further processing. +func isCompactionTaskCleaned(t *datapb.CompactionTask) bool { + switch t.GetState() { + case datapb.CompactionTaskState_cleaned, + datapb.CompactionTaskState_unknown: + return true + default: + return false + } +}