mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Add sub task pool for multi-stage tasks (#40079)
Related to #40078 Add a subTaskPool to execute sub task in case of logic deadlock described in issue. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
e1b5b37195
commit
a774f05ea7
@ -140,6 +140,7 @@ type task interface {
|
||||
CanSkipAllocTimestamp() bool
|
||||
SetOnEnqueueTime()
|
||||
GetDurationInQueue() time.Duration
|
||||
IsSubTask() bool
|
||||
}
|
||||
|
||||
type baseTask struct {
|
||||
@ -158,6 +159,10 @@ func (bt *baseTask) GetDurationInQueue() time.Duration {
|
||||
return time.Since(bt.onEnqueueTime)
|
||||
}
|
||||
|
||||
func (bt *baseTask) IsSubTask() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type dmlTask interface {
|
||||
task
|
||||
setChannels() error
|
||||
|
||||
@ -564,6 +564,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *queryTask) IsSubTask() bool {
|
||||
return t.reQuery
|
||||
}
|
||||
|
||||
func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
|
||||
needOverrideMvcc := false
|
||||
mvccTs := t.MvccTimestamp
|
||||
|
||||
@ -554,7 +554,10 @@ func (sched *taskScheduler) manipulationLoop() {
|
||||
func (sched *taskScheduler) queryLoop() {
|
||||
defer sched.wg.Done()
|
||||
|
||||
pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt(), conc.WithExpiryDuration(time.Minute))
|
||||
poolSize := paramtable.Get().ProxyCfg.MaxTaskNum.GetAsInt()
|
||||
pool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute))
|
||||
subTaskPool := conc.NewPool[struct{}](poolSize, conc.WithExpiryDuration(time.Minute))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sched.ctx.Done():
|
||||
@ -562,7 +565,12 @@ func (sched *taskScheduler) queryLoop() {
|
||||
case <-sched.dqQueue.utChan():
|
||||
if !sched.dqQueue.utEmpty() {
|
||||
t := sched.scheduleDqTask()
|
||||
pool.Submit(func() (struct{}, error) {
|
||||
p := pool
|
||||
// if task is sub task spawned by another, use sub task pool in case of deadlock
|
||||
if t.IsSubTask() {
|
||||
p = subTaskPool
|
||||
}
|
||||
p.Submit(func() (struct{}, error) {
|
||||
sched.processTask(t, sched.dqQueue)
|
||||
return struct{}{}, nil
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user