From 192521c6bd4a240b027640ccfea118968c1740db Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 28 Jul 2025 12:58:55 +0800 Subject: [PATCH] enhance: Fix unbalanced task scheduling (#43581) Make scheduler always pick the node with the most available slots. issue: https://github.com/milvus-io/milvus/issues/43580 --------- Signed-off-by: bigsheeper --- internal/datacoord/task/global_scheduler.go | 16 ++++------ .../datacoord/task/global_scheduler_test.go | 31 ++++++++++--------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/internal/datacoord/task/global_scheduler.go b/internal/datacoord/task/global_scheduler.go index 97ae9caf09..708204418b 100644 --- a/internal/datacoord/task/global_scheduler.go +++ b/internal/datacoord/task/global_scheduler.go @@ -138,23 +138,19 @@ func (s *globalTaskScheduler) Stop() { } func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, taskSlot int64) int64 { - var fallbackNodeID int64 = NullNodeID var maxAvailable int64 = -1 + var nodeID int64 = NullNodeID - for nodeID, ws := range workerSlots { - if ws.AvailableSlots >= taskSlot { - ws.AvailableSlots -= taskSlot - return nodeID - } + for id, ws := range workerSlots { if ws.AvailableSlots > maxAvailable && ws.AvailableSlots > 0 { maxAvailable = ws.AvailableSlots - fallbackNodeID = nodeID + nodeID = id } } - if fallbackNodeID != NullNodeID { - workerSlots[fallbackNodeID].AvailableSlots = 0 - return fallbackNodeID + if nodeID != NullNodeID { + workerSlots[nodeID].AvailableSlots = 0 + return nodeID } return NullNodeID } diff --git a/internal/datacoord/task/global_scheduler_test.go b/internal/datacoord/task/global_scheduler_test.go index 3b0804e649..30ff7de383 100644 --- a/internal/datacoord/task/global_scheduler_test.go +++ b/internal/datacoord/task/global_scheduler_test.go @@ -127,17 +127,20 @@ func TestGlobalScheduler_pickNode(t *testing.T) { } func TestGlobalScheduler_TestSchedule(t *testing.T) { - cluster := session.NewMockCluster(t) - cluster.EXPECT().QuerySlot().Return(map[int64]*session.WorkerSlots{ - 1: { - NodeID: 1, - AvailableSlots: 100, - }, - 2: { - NodeID: 2, - AvailableSlots: 100, - }, - }).Maybe() + newCluster := func() session.Cluster { + cluster := session.NewMockCluster(t) + cluster.EXPECT().QuerySlot().Return(map[int64]*session.WorkerSlots{ + 1: { + NodeID: 1, + AvailableSlots: 100, + }, + 2: { + NodeID: 2, + AvailableSlots: 100, + }, + }).Maybe() + return cluster + } newTask := func() *MockTask { task := NewMockTask(t) @@ -149,7 +152,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) { } t.Run("task retry when CreateTaskOnWorker", func(t *testing.T) { - scheduler := NewGlobalTaskScheduler(context.TODO(), cluster) + scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster()) scheduler.Start() defer scheduler.Stop() @@ -180,7 +183,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) { }) t.Run("task retry when QueryTaskOnWorker", func(t *testing.T) { - scheduler := NewGlobalTaskScheduler(context.TODO(), cluster) + scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster()) scheduler.Start() defer scheduler.Stop() @@ -218,7 +221,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - scheduler := NewGlobalTaskScheduler(context.TODO(), cluster) + scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster()) scheduler.Start() defer scheduler.Stop()