From a82c235599874a488f7cd0db0254fa7c09fd3026 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 11 Nov 2022 17:55:05 +0800 Subject: [PATCH] Scheduler should returns err if the queue is full (#20465) Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/checkers/controller.go | 8 -------- internal/querycoordv2/task/scheduler.go | 14 ++++++-------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index e82cb754d2..96cdb9a067 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -27,7 +27,6 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" - "go.uber.org/zap" ) var ( @@ -111,18 +110,11 @@ func (controller *CheckerController) check(ctx context.Context) { tasks = append(tasks, checker.Check(ctx)...) } - added := 0 for _, task := range tasks { err := controller.scheduler.Add(task) if err != nil { task.Cancel() continue } - added++ - if added >= checkRoundTaskNumLimit { - log.Info("checkers have added too many tasks, truncate the subsequent tasks", - zap.Int("taskNum", len(tasks)), - zap.Int("taskNumLimit", checkRoundTaskNumLimit)) - } } } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 8c1547835f..8e3f28cf02 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -218,6 +218,11 @@ func (scheduler *taskScheduler) Add(task Task) error { return err } + if !scheduler.waitQueue.Add(task) { + log.Warn("failed to add task", zap.String("task", task.String())) + return ErrTaskQueueFull + } + task.SetID(scheduler.idAllocator()) scheduler.tasks.Insert(task.ID()) switch task := task.(type) { @@ -229,10 +234,7 @@ func (scheduler *taskScheduler) Add(task Task) error { index := replicaChannelIndex{task.ReplicaID(), task.Channel()} scheduler.channelTasks[index] = task } - if !scheduler.waitQueue.Add(task) { - log.Warn("failed to add task", zap.String("task", task.String())) - return nil - } + metrics.QueryCoordTaskNum.WithLabelValues().Set(float64(scheduler.tasks.Len())) log.Info("task added", zap.String("task", task.String())) return nil @@ -241,10 +243,6 @@ func (scheduler *taskScheduler) Add(task Task) error { // check checks whether the task is valid to add, // must hold lock func (scheduler *taskScheduler) preAdd(task Task) error { - if scheduler.waitQueue.Len() >= scheduler.waitQueue.Cap() { - return ErrTaskQueueFull - } - switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task)