From a774f05ea79a7e681843e7e69680c752c750900e Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 22 Feb 2025 16:37:54 +0800 Subject: [PATCH] fix: Add sub task pool for multi-stage tasks (#40079) Related to #40078 Add a subTaskPool to execute sub task in case of logic deadlock described in issue. Signed-off-by: Congqi Xia --- internal/proxy/task.go | 5 +++++ internal/proxy/task_query.go | 4 ++++ internal/proxy/task_scheduler.go | 12 ++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index a6bb600ada..6a96be3cbc 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -140,6 +140,7 @@ type task interface { CanSkipAllocTimestamp() bool SetOnEnqueueTime() GetDurationInQueue() time.Duration + IsSubTask() bool } type baseTask struct { @@ -158,6 +159,10 @@ func (bt *baseTask) GetDurationInQueue() time.Duration { return time.Since(bt.onEnqueueTime) } +func (bt *baseTask) IsSubTask() bool { + return false +} + type dmlTask interface { task setChannels() error diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 6f3d3e83b6..ecf1c7424b 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -564,6 +564,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error { return nil } +func (t *queryTask) IsSubTask() bool { + return t.reQuery +} + func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error { needOverrideMvcc := false mvccTs := t.MvccTimestamp diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index eab9f2a599..4e3b66e3ff 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -554,7 +554,10 @@ func (sched *taskScheduler) manipulationLoop() { func (sched *taskScheduler) queryLoop() { defer sched.wg.Done() - pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt(), conc.WithExpiryDuration(time.Minute)) + poolSize := paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt() + pool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute)) + subTaskPool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute)) + for { select { case <-sched.ctx.Done(): @@ -562,7 +565,12 @@ func (sched *taskScheduler) queryLoop() { case <-sched.dqQueue.utChan(): if !sched.dqQueue.utEmpty() { t := sched.scheduleDqTask() - pool.Submit(func() (struct{}, error) { + p := pool + // if task is sub task spawned by another, use sub task pool in case of deadlock + if t.IsSubTask() { + p = subTaskPool + } + p.Submit(func() (struct{}, error) { sched.processTask(t, sched.dqQueue) return struct{}{}, nil })