diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index 711b478248..448df201b7 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -309,11 +309,7 @@ func (c *compactionInspector) loadMeta() { triggers := c.meta.GetCompactionTasks(context.TODO()) for _, tasks := range triggers { for _, task := range tasks { - state := task.GetState() - if state == datapb.CompactionTaskState_completed || - state == datapb.CompactionTaskState_cleaned || - state == datapb.CompactionTaskState_timeout || - state == datapb.CompactionTaskState_unknown { + if isCompactionTaskFinished(task) { log.Info("compactionInspector loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 708990a2b7..18808ad4a6 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -89,7 +89,7 @@ func (csm *compactionTaskMeta) reloadFromKV() error { for _, task := range compactionTasks { // Compatibility handling: for milvus ≤v2.4, since compaction task has no PreAllocatedSegmentIDs field, // here we just mark the task as failed and wait for the compaction trigger to generate a new one. - if task.PreAllocatedSegmentIDs == nil { + if !isCompactionTaskFinished(task) && task.PreAllocatedSegmentIDs == nil { log.Warn("PreAllocatedSegmentIDs is nil, mark the task as failed", zap.Int64("taskID", task.GetPlanID()), zap.String("type", task.GetType().String()), diff --git a/internal/datacoord/compaction_util.go b/internal/datacoord/compaction_util.go index ec1f4e808b..f0ceadf415 100644 --- a/internal/datacoord/compaction_util.go +++ b/internal/datacoord/compaction_util.go @@ -110,3 +110,17 @@ func WrapPluginContext(collectionID int64, properties []*commonpb.KeyValuePair, return } } + +// isCompactionTaskFinished returns true if the task has reached a terminal state +// (timeout, completed, cleaned, or unknown) and requires no further processing. +func isCompactionTaskFinished(t *datapb.CompactionTask) bool { + switch t.GetState() { + case datapb.CompactionTaskState_timeout, + datapb.CompactionTaskState_completed, + datapb.CompactionTaskState_cleaned, + datapb.CompactionTaskState_unknown: + return true + default: + return false + } +}