From 6c8146326364db55c4bfe0c24ef235a96fb2cf69 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Sun, 23 Feb 2025 18:29:55 +0800 Subject: [PATCH] fix: [2.5] Fix task scheduler dead lock (#40121) issue: #39101 master pr: #39084 Signed-off-by: cai.zhang --- internal/datacoord/task_scheduler.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 86f6b6c4eb..d2ed08e2e1 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -343,6 +343,9 @@ func (s *taskScheduler) checkProcessingTasks() { taskID := taskID go func(taskID int64) { defer wg.Done() + defer func() { + <-sem + }() task := s.getRunningTask(taskID) s.taskLock.Lock(taskID) suc := s.checkProcessingTask(task) @@ -350,7 +353,6 @@ func (s *taskScheduler) checkProcessingTasks() { if suc { s.removeRunningTask(taskID) } - <-sem }(taskID) } wg.Wait() @@ -398,6 +400,9 @@ func (s *taskScheduler) run() { sem <- struct{}{} go func(task Task, nodeID int64) { defer wg.Done() + defer func() { + <-sem + }() s.taskLock.Lock(task.GetTaskID()) s.process(task, nodeID) @@ -413,7 +418,6 @@ func (s *taskScheduler) run() { s.runningTasks[task.GetTaskID()] = task s.runningQueueLock.Unlock() } - <-sem }(task, nodeID) } wg.Wait()