From ddd29ea6ab7e6cafc1d41adbf23e1c423444e99d Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 1 Dec 2022 16:25:16 +0800 Subject: [PATCH] Optimize scheduler, increase merge tasks probability (#20922) Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/task/scheduler.go | 65 +++++++++++++++++++------ 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 1e06d8588e..81a4d4222a 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -20,10 +20,9 @@ import ( "context" "errors" "fmt" + "runtime" "sync" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -31,7 +30,10 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/funcutil" . "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/atomic" + "go.uber.org/zap" ) const ( @@ -479,10 +481,11 @@ func (scheduler *taskScheduler) schedule(node int64) { ) // Process tasks + toProcess := make([]Task, 0) toRemove := make([]Task, 0) scheduler.processQueue.Range(func(task Task) bool { - if scheduler.isRelated(task, node) { - scheduler.process(task) + if scheduler.isRelated(task, node) && scheduler.preProcess(task) { + toProcess = append(toProcess, task) } if task.Status() != TaskStatusStarted { toRemove = append(toRemove, task) @@ -491,12 +494,25 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + // The scheduler doesn't limit the number of tasks, + // to commit tasks to executors as soon as possible, to reach higher merge possibility + failCount := atomic.NewInt32(0) + funcutil.ProcessFuncParallel(len(toProcess), runtime.GOMAXPROCS(0), func(idx int) error { + if !scheduler.process(toProcess[idx]) { + failCount.Inc() + } + return nil + }, "process") + for _, task := range toRemove { scheduler.remove(task) } log.Info("processed tasks", - zap.Int("toRemoveNum", len(toRemove))) + zap.Int("toProcessNum", len(toProcess)), + zap.Int32("failCount", failCount.Load()), + zap.Int("toRemoveNum", len(toRemove)), + ) log.Debug("process tasks related to node done", zap.Int("processingTaskNum", scheduler.processQueue.Len()), @@ -538,9 +554,11 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { return false } -// process processes the given task, -// return true if the task is started and succeeds to commit the current action -func (scheduler *taskScheduler) process(task Task) bool { +// preProcess checks the finished actions of task, +// and converts the task's status, +// return true if the task should be executed, +// false otherwise +func (scheduler *taskScheduler) preProcess(task Task) bool { log := log.With( zap.Int64("taskID", task.ID()), zap.Int32("type", GetTaskType(task)), @@ -566,10 +584,10 @@ func (scheduler *taskScheduler) process(task Task) bool { } if task.IsFinished(scheduler.distMgr) { - if executor.Exist(task.ID()) { - return false + if !executor.Exist(task.ID()) { + task.SetStatus(TaskStatusSucceeded) } - task.SetStatus(TaskStatusSucceeded) + return false } else if scheduler.checkCanceled(task) { task.SetStatus(TaskStatusCanceled) if task.Err() == nil { @@ -580,12 +598,31 @@ func (scheduler *taskScheduler) process(task Task) bool { task.SetErr(ErrTaskStale) } + return task.Status() == TaskStatusStarted +} + +// process processes the given task, +// return true if the task is started and succeeds to commit the current action +func (scheduler *taskScheduler) process(task Task) bool { + log := log.With( + zap.Int64("taskID", task.ID()), + zap.Int32("type", GetTaskType(task)), + zap.Int64("source", task.SourceID()), + ) + + actions, step := task.Actions(), task.Step() + executor, ok := scheduler.executors[actions[step].Node()] + if !ok { + log.Warn("no executor for QueryNode", + zap.Int("step", step), + zap.Int64("nodeID", actions[step].Node())) + return false + } + log = log.With(zap.Int("step", step)) switch task.Status() { case TaskStatusStarted: - if executor.Execute(task, step) { - return true - } + return executor.Execute(task, step) case TaskStatusSucceeded: log.Info("task succeeded")