diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index b78b8c4f4d..4c288fb5cc 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -299,12 +299,13 @@ func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask { // expireCompaction set the compaction state to expired func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { + // Get executing tasks before GetCompactionState from DataNode to prevent false failure, + // for DC might add new task while GetCompactionState. + tasks := c.getExecutingCompactions() planStates := c.sessions.GetCompactionState() c.mu.Lock() defer c.mu.Unlock() - - tasks := c.getExecutingCompactions() for _, task := range tasks { stateResult, ok := planStates[task.plan.PlanID] state := stateResult.GetState() @@ -381,6 +382,8 @@ func (c *compactionPlanHandler) isFull() bool { } func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask { + c.mu.RLock() + defer c.mu.RUnlock() tasks := make([]*compactionTask, 0, len(c.plans)) for _, plan := range c.plans { if plan.state == executing {