From cc371d68016fcc1be432dc791b95f0245f22e60d Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 17 Nov 2022 17:55:09 +0800 Subject: [PATCH] Task/Action won't finish util the RPC returned (#20669) Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/task/executor.go | 26 +++++++++---------------- internal/querycoordv2/task/scheduler.go | 2 +- internal/querycoordv2/task/task_test.go | 2 +- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index a0dd2fffbb..3346a0b573 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -32,11 +32,6 @@ import ( "go.uber.org/zap" ) -type actionIndex struct { - Task int64 - Step int -} - type Executor struct { doneCh chan struct{} wg sync.WaitGroup @@ -50,7 +45,7 @@ type Executor struct { // Merge load segment requests merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] - executingActions sync.Map + executingTasks sync.Map } func NewExecutor(meta *meta.Meta, @@ -69,7 +64,7 @@ func NewExecutor(meta *meta.Meta, nodeMgr: nodeMgr, merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](), - executingActions: sync.Map{}, + executingTasks: sync.Map{}, } } @@ -87,11 +82,7 @@ func (ex *Executor) Stop() { // does nothing and returns false if the action is already committed, // returns true otherwise. func (ex *Executor) Execute(task Task, step int) bool { - index := actionIndex{ - Task: task.ID(), - Step: step, - } - _, exist := ex.executingActions.LoadOrStore(index, struct{}{}) + _, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) if exist { return false } @@ -116,6 +107,11 @@ func (ex *Executor) Execute(task Task, step int) bool { return true } +func (ex *Executor) Exist(taskID int64) bool { + _, ok := ex.executingTasks.Load(taskID) + return ok +} + func (ex *Executor) scheduleRequests() { ex.wg.Add(1) go func() { @@ -192,11 +188,7 @@ func (ex *Executor) removeAction(task Task, step int) { zap.Error(task.Err())) } - index := actionIndex{ - Task: task.ID(), - Step: step, - } - ex.executingActions.Delete(index) + ex.executingTasks.Delete(task.ID()) } func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 9d8c15fb8d..5265eacfd0 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -527,7 +527,7 @@ func (scheduler *taskScheduler) process(task Task) bool { zap.Int64("source", task.SourceID()), ) - if task.IsFinished(scheduler.distMgr) { + if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) { task.SetStatus(TaskStatusSucceeded) } else if scheduler.checkCanceled(task) { task.SetStatus(TaskStatusCanceled) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 26f17580cd..94629e6285 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1148,7 +1148,7 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { for start := time.Now(); time.Since(start) < timeout; { count = 0 keys = make([]any, 0) - suite.scheduler.executor.executingActions.Range(func(key, value any) bool { + suite.scheduler.executor.executingTasks.Range(func(key, value any) bool { keys = append(keys, key) count++ return true