diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index d8ce57046a..e6287ae4d0 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -349,9 +349,11 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { for _, task := range tasks { + // avoid closure capture iteration variable + innerTask := task getOrCreateIOPool().Submit(func() (any, error) { - plan := task.plan - log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", task.dataNodeID)) + plan := innerTask.plan + log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID)) log.Info("Notify compaction task to DataNode") ts, err := c.allocator.allocTimestamp(context.TODO()) if err != nil { @@ -360,9 +362,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout)) return nil, err } - c.updateTask(task.plan.PlanID, setStartTime(ts)) - err = c.sessions.Compaction(task.dataNodeID, task.plan) - c.updateTask(task.plan.PlanID, setState(executing)) + c.updateTask(plan.PlanID, setStartTime(ts)) + err = c.sessions.Compaction(innerTask.dataNodeID, plan) + c.updateTask(plan.PlanID, setState(executing)) if err != nil { log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) return nil, err