enhance: Fix unbalanced task scheduling (#43581)

Make scheduler always pick the node with the most available slots.

issue: https://github.com/milvus-io/milvus/issues/43580

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-28 12:58:55 +08:00 committed by GitHub
parent 34d3f0c0f8
commit 192521c6bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 24 deletions

View File

@ -138,23 +138,19 @@ func (s *globalTaskScheduler) Stop() {
}
func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, taskSlot int64) int64 {
var fallbackNodeID int64 = NullNodeID
var maxAvailable int64 = -1
var nodeID int64 = NullNodeID
for nodeID, ws := range workerSlots {
if ws.AvailableSlots >= taskSlot {
ws.AvailableSlots -= taskSlot
return nodeID
}
for id, ws := range workerSlots {
if ws.AvailableSlots > maxAvailable && ws.AvailableSlots > 0 {
maxAvailable = ws.AvailableSlots
fallbackNodeID = nodeID
nodeID = id
}
}
if fallbackNodeID != NullNodeID {
workerSlots[fallbackNodeID].AvailableSlots = 0
return fallbackNodeID
if nodeID != NullNodeID {
workerSlots[nodeID].AvailableSlots = 0
return nodeID
}
return NullNodeID
}

View File

@ -127,17 +127,20 @@ func TestGlobalScheduler_pickNode(t *testing.T) {
}
func TestGlobalScheduler_TestSchedule(t *testing.T) {
cluster := session.NewMockCluster(t)
cluster.EXPECT().QuerySlot().Return(map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
AvailableSlots: 100,
},
2: {
NodeID: 2,
AvailableSlots: 100,
},
}).Maybe()
newCluster := func() session.Cluster {
cluster := session.NewMockCluster(t)
cluster.EXPECT().QuerySlot().Return(map[int64]*session.WorkerSlots{
1: {
NodeID: 1,
AvailableSlots: 100,
},
2: {
NodeID: 2,
AvailableSlots: 100,
},
}).Maybe()
return cluster
}
newTask := func() *MockTask {
task := NewMockTask(t)
@ -149,7 +152,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) {
}
t.Run("task retry when CreateTaskOnWorker", func(t *testing.T) {
scheduler := NewGlobalTaskScheduler(context.TODO(), cluster)
scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster())
scheduler.Start()
defer scheduler.Stop()
@ -180,7 +183,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) {
})
t.Run("task retry when QueryTaskOnWorker", func(t *testing.T) {
scheduler := NewGlobalTaskScheduler(context.TODO(), cluster)
scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster())
scheduler.Start()
defer scheduler.Stop()
@ -218,7 +221,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) {
})
t.Run("normal case", func(t *testing.T) {
scheduler := NewGlobalTaskScheduler(context.TODO(), cluster)
scheduler := NewGlobalTaskScheduler(context.TODO(), newCluster())
scheduler.Start()
defer scheduler.Stop()