fix: [2.5] Ensure task execution order by using a priority queue (#43272)

issue: #43260 
master pr: #43271

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-07-23 10:16:54 +08:00 committed by GitHub
parent fe39128021
commit 3ed3bf92e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 99 additions and 91 deletions

View File

@ -112,7 +112,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
mt: mt,
scheduler: &taskScheduler{
allocator: alloc,
pendingTasks: newFairQueuePolicy(),
pendingTasks: newPriorityQueuePolicy(),
runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](),
meta: mt,
taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5),

View File

@ -17,6 +17,7 @@
package datacoord
import (
"container/heap"
"sync"
)
@ -25,7 +26,6 @@ type schedulePolicy interface {
Push(task Task)
// Pop get the task next ready to run.
Pop() Task
BatchPop(batch int) []Task
Get(taskID UniqueID) Task
Keys() []UniqueID
TaskCount() int
@ -33,111 +33,119 @@ type schedulePolicy interface {
Remove(taskID UniqueID)
}
var _ schedulePolicy = &fairQueuePolicy{}
var (
_ schedulePolicy = &priorityQueuePolicy{}
)
type fairQueuePolicy struct {
tasks map[UniqueID]Task
taskIDs []UniqueID
lock sync.RWMutex
// priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority)
type priorityQueuePolicy struct {
tasks map[UniqueID]Task
heap *taskHeap
lock sync.RWMutex
}
func newFairQueuePolicy() *fairQueuePolicy {
return &fairQueuePolicy{
tasks: make(map[UniqueID]Task, 0),
taskIDs: make([]UniqueID, 0),
lock: sync.RWMutex{},
// taskHeap implements a min-heap for Task objects, sorted by taskID
type taskHeap []Task
func (h taskHeap) Len() int { return len(h) }
func (h taskHeap) Less(i, j int) bool { return h[i].GetTaskID() < h[j].GetTaskID() }
func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *taskHeap) Push(x interface{}) {
*h = append(*h, x.(Task))
}
func (h *taskHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
// newPriorityQueuePolicy creates a new priority queue policy
func newPriorityQueuePolicy() *priorityQueuePolicy {
h := &taskHeap{}
heap.Init(h)
return &priorityQueuePolicy{
tasks: make(map[UniqueID]Task),
heap: h,
lock: sync.RWMutex{},
}
}
func (fqp *fairQueuePolicy) Push(t Task) {
fqp.lock.Lock()
defer fqp.lock.Unlock()
if _, ok := fqp.tasks[t.GetTaskID()]; !ok {
fqp.tasks[t.GetTaskID()] = t
fqp.taskIDs = append(fqp.taskIDs, t.GetTaskID())
func (pqp *priorityQueuePolicy) Push(task Task) {
pqp.lock.Lock()
defer pqp.lock.Unlock()
taskID := task.GetTaskID()
if _, exists := pqp.tasks[taskID]; !exists {
pqp.tasks[taskID] = task
heap.Push(pqp.heap, task)
}
}
func (fqp *fairQueuePolicy) Pop() Task {
fqp.lock.Lock()
defer fqp.lock.Unlock()
if len(fqp.taskIDs) == 0 {
func (pqp *priorityQueuePolicy) Pop() Task {
pqp.lock.Lock()
defer pqp.lock.Unlock()
if pqp.heap.Len() == 0 {
return nil
}
taskID := fqp.taskIDs[0]
fqp.taskIDs = fqp.taskIDs[1:]
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
task := heap.Pop(pqp.heap).(Task)
delete(pqp.tasks, task.GetTaskID())
return task
}
func (fqp *fairQueuePolicy) BatchPop(batch int) []Task {
fqp.lock.Lock()
defer fqp.lock.Unlock()
tasks := make([]Task, 0)
if len(fqp.taskIDs) <= batch {
for _, taskID := range fqp.taskIDs {
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
tasks = append(tasks, task)
}
fqp.taskIDs = make([]UniqueID, 0)
return tasks
func (pqp *priorityQueuePolicy) Get(taskID UniqueID) Task {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
return pqp.tasks[taskID]
}
func (pqp *priorityQueuePolicy) Keys() []UniqueID {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
keys := make([]UniqueID, 0, len(pqp.tasks))
for taskID := range pqp.tasks {
keys = append(keys, taskID)
}
return keys
}
func (pqp *priorityQueuePolicy) TaskCount() int {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
return len(pqp.tasks)
}
func (pqp *priorityQueuePolicy) Exist(taskID UniqueID) bool {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
_, exists := pqp.tasks[taskID]
return exists
}
func (pqp *priorityQueuePolicy) Remove(taskID UniqueID) {
pqp.lock.Lock()
defer pqp.lock.Unlock()
if _, exists := pqp.tasks[taskID]; !exists {
return
}
taskIDs := fqp.taskIDs[:batch]
for _, taskID := range taskIDs {
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
tasks = append(tasks, task)
}
fqp.taskIDs = fqp.taskIDs[batch:]
return tasks
}
delete(pqp.tasks, taskID)
func (fqp *fairQueuePolicy) Get(taskID UniqueID) Task {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
if len(fqp.taskIDs) == 0 {
return nil
}
task := fqp.tasks[taskID]
return task
}
func (fqp *fairQueuePolicy) TaskCount() int {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
return len(fqp.taskIDs)
}
func (fqp *fairQueuePolicy) Exist(taskID UniqueID) bool {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
_, ok := fqp.tasks[taskID]
return ok
}
func (fqp *fairQueuePolicy) Remove(taskID UniqueID) {
fqp.lock.Lock()
defer fqp.lock.Unlock()
taskIndex := -1
for i := range fqp.taskIDs {
if fqp.taskIDs[i] == taskID {
taskIndex = i
// Find and remove from heap
for i, task := range *pqp.heap {
if task.GetTaskID() == taskID {
heap.Remove(pqp.heap, i)
break
}
}
if taskIndex != -1 {
fqp.taskIDs = append(fqp.taskIDs[:taskIndex], fqp.taskIDs[taskIndex+1:]...)
delete(fqp.tasks, taskID)
}
}
func (fqp *fairQueuePolicy) Keys() []UniqueID {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
return fqp.taskIDs
}

View File

@ -81,7 +81,7 @@ func newTaskScheduler(
ctx: ctx,
cancel: cancel,
meta: metaTable,
pendingTasks: newFairQueuePolicy(),
pendingTasks: newPriorityQueuePolicy(),
runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),

View File

@ -2362,7 +2362,7 @@ func (s *taskSchedulerSuite) Test_zeroSegmentStats() {
ctx: ctx,
cancel: cancel,
meta: mt,
pendingTasks: newFairQueuePolicy(),
pendingTasks: newPriorityQueuePolicy(),
runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),