From 09437134815da5939e49347c6cd9dccf8a967b96 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 23 Dec 2025 18:09:18 +0800 Subject: [PATCH] fix: Skip Finished tasks when recovery with compatibility (#46515) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### **User description** issue: #46466 ___ ### **PR Type** Bug fix ___ ### **Description** - Extract finished task state check into reusable helper function - Skip finished tasks during compaction recovery to prevent reprocessing - Add backward compatibility check for pre-allocated segment IDs ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Compaction Task States"] -->|"Check with helper"| B["isCompactionTaskFinished()"] B -->|"Used in"| C["compactionInspector.loadMeta()"] B -->|"Used in"| D["compactionTaskMeta.reloadFromKV()"] C -->|"Skip finished tasks"| E["Recovery Process"] D -->|"Backward compatibility"| E ```

File Walkthrough

Relevant files
Enhancement
compaction_util.go
Add isCompactionTaskFinished helper function                         

internal/datacoord/compaction_util.go
  • Added new helper function isCompactionTaskFinished() to check if a
    compaction task is in a terminal state
  • Function checks for failed, timeout, completed, cleaned, or unknown
    states
  • Centralizes task state validation logic for reuse across multiple
    components
+8/-0     
Bug fix
compaction_inspector.go
Refactor to use finished task helper function                       

internal/datacoord/compaction_inspector.go
  • Replaced inline state checks with call to isCompactionTaskFinished()
    helper
  • Simplifies code by removing repetitive state comparison logic
  • Maintains same behavior of skipping finished tasks during recovery
+1/-5     
compaction_task_meta.go
Add finished task check for backward compatibility             

internal/datacoord/compaction_task_meta.go
  • Added check to skip finished tasks before processing pre-allocated
    segment IDs
  • Ensures backward compatibility for tasks without pre-allocated segment
    IDs
  • Prevents marking already-finished tasks as failed during reload
+1/-1     
___ ## Summary by CodeRabbit * **Bug Fixes** * Improved detection of finished compaction tasks to reduce false failures. * Prevented finished tasks with missing pre-allocations from being incorrectly marked as failed. * Simplified abandonment logic for completed/timeout/cleaned tasks to reduce erroneous retries and noisy logs. ✏️ Tip: You can customize this high-level summary in your review settings. --------- Signed-off-by: Cai Zhang --- internal/datacoord/compaction_inspector.go | 6 +----- internal/datacoord/compaction_task_meta.go | 2 +- internal/datacoord/compaction_util.go | 14 ++++++++++++++ 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index 711b478248..448df201b7 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -309,11 +309,7 @@ func (c *compactionInspector) loadMeta() { triggers := c.meta.GetCompactionTasks(context.TODO()) for _, tasks := range triggers { for _, task := range tasks { - state := task.GetState() - if state == datapb.CompactionTaskState_completed || - state == datapb.CompactionTaskState_cleaned || - state == datapb.CompactionTaskState_timeout || - state == datapb.CompactionTaskState_unknown { + if isCompactionTaskFinished(task) { log.Info("compactionInspector loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 708990a2b7..18808ad4a6 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -89,7 +89,7 @@ func (csm *compactionTaskMeta) reloadFromKV() error { for _, task := range compactionTasks { // Compatibility handling: for milvus ≤v2.4, since compaction task has no PreAllocatedSegmentIDs field, // here we just mark the task as failed and wait for the compaction trigger to generate a new one. - if task.PreAllocatedSegmentIDs == nil { + if !isCompactionTaskFinished(task) && task.PreAllocatedSegmentIDs == nil { log.Warn("PreAllocatedSegmentIDs is nil, mark the task as failed", zap.Int64("taskID", task.GetPlanID()), zap.String("type", task.GetType().String()), diff --git a/internal/datacoord/compaction_util.go b/internal/datacoord/compaction_util.go index ec1f4e808b..f0ceadf415 100644 --- a/internal/datacoord/compaction_util.go +++ b/internal/datacoord/compaction_util.go @@ -110,3 +110,17 @@ func WrapPluginContext(collectionID int64, properties []*commonpb.KeyValuePair, return } } + +// isCompactionTaskFinished returns true if the task has reached a terminal state +// (timeout, completed, cleaned, or unknown) and requires no further processing. +func isCompactionTaskFinished(t *datapb.CompactionTask) bool { + switch t.GetState() { + case datapb.CompactionTaskState_timeout, + datapb.CompactionTaskState_completed, + datapb.CompactionTaskState_cleaned, + datapb.CompactionTaskState_unknown: + return true + default: + return false + } +}