diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index 0e514a4045..a1799e3dca 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -112,7 +112,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { mt: mt, scheduler: &taskScheduler{ allocator: alloc, - pendingTasks: newFairQueuePolicy(), + pendingTasks: newPriorityQueuePolicy(), runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](), meta: mt, taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5), diff --git a/internal/datacoord/task_queue.go b/internal/datacoord/task_queue.go index eb3c489e1d..b9ce7f0eb7 100644 --- a/internal/datacoord/task_queue.go +++ b/internal/datacoord/task_queue.go @@ -17,6 +17,7 @@ package datacoord import ( + "container/heap" "sync" ) @@ -25,7 +26,6 @@ type schedulePolicy interface { Push(task Task) // Pop get the task next ready to run. Pop() Task - BatchPop(batch int) []Task Get(taskID UniqueID) Task Keys() []UniqueID TaskCount() int @@ -33,111 +33,119 @@ type schedulePolicy interface { Remove(taskID UniqueID) } -var _ schedulePolicy = &fairQueuePolicy{} +var ( + _ schedulePolicy = &priorityQueuePolicy{} +) -type fairQueuePolicy struct { - tasks map[UniqueID]Task - taskIDs []UniqueID - lock sync.RWMutex +// priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority) +type priorityQueuePolicy struct { + tasks map[UniqueID]Task + heap *taskHeap + lock sync.RWMutex } -func newFairQueuePolicy() *fairQueuePolicy { - return &fairQueuePolicy{ - tasks: make(map[UniqueID]Task, 0), - taskIDs: make([]UniqueID, 0), - lock: sync.RWMutex{}, +// taskHeap implements a min-heap for Task objects, sorted by taskID +type taskHeap []Task + +func (h taskHeap) Len() int { return len(h) } +func (h taskHeap) Less(i, j int) bool { return h[i].GetTaskID() < h[j].GetTaskID() } +func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *taskHeap) Push(x interface{}) { + *h = append(*h, x.(Task)) +} + +func (h *taskHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + return item +} + +// newPriorityQueuePolicy creates a new priority queue policy +func newPriorityQueuePolicy() *priorityQueuePolicy { + h := &taskHeap{} + heap.Init(h) + return &priorityQueuePolicy{ + tasks: make(map[UniqueID]Task), + heap: h, + lock: sync.RWMutex{}, } } -func (fqp *fairQueuePolicy) Push(t Task) { - fqp.lock.Lock() - defer fqp.lock.Unlock() - if _, ok := fqp.tasks[t.GetTaskID()]; !ok { - fqp.tasks[t.GetTaskID()] = t - fqp.taskIDs = append(fqp.taskIDs, t.GetTaskID()) +func (pqp *priorityQueuePolicy) Push(task Task) { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + taskID := task.GetTaskID() + if _, exists := pqp.tasks[taskID]; !exists { + pqp.tasks[taskID] = task + heap.Push(pqp.heap, task) } } -func (fqp *fairQueuePolicy) Pop() Task { - fqp.lock.Lock() - defer fqp.lock.Unlock() - if len(fqp.taskIDs) == 0 { +func (pqp *priorityQueuePolicy) Pop() Task { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + if pqp.heap.Len() == 0 { return nil } - taskID := fqp.taskIDs[0] - fqp.taskIDs = fqp.taskIDs[1:] - task := fqp.tasks[taskID] - delete(fqp.tasks, taskID) + + task := heap.Pop(pqp.heap).(Task) + delete(pqp.tasks, task.GetTaskID()) return task } -func (fqp *fairQueuePolicy) BatchPop(batch int) []Task { - fqp.lock.Lock() - defer fqp.lock.Unlock() - tasks := make([]Task, 0) - if len(fqp.taskIDs) <= batch { - for _, taskID := range fqp.taskIDs { - task := fqp.tasks[taskID] - delete(fqp.tasks, taskID) - tasks = append(tasks, task) - } - fqp.taskIDs = make([]UniqueID, 0) - return tasks +func (pqp *priorityQueuePolicy) Get(taskID UniqueID) Task { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + return pqp.tasks[taskID] +} + +func (pqp *priorityQueuePolicy) Keys() []UniqueID { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + keys := make([]UniqueID, 0, len(pqp.tasks)) + for taskID := range pqp.tasks { + keys = append(keys, taskID) + } + return keys +} + +func (pqp *priorityQueuePolicy) TaskCount() int { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + return len(pqp.tasks) +} + +func (pqp *priorityQueuePolicy) Exist(taskID UniqueID) bool { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + _, exists := pqp.tasks[taskID] + return exists +} + +func (pqp *priorityQueuePolicy) Remove(taskID UniqueID) { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + if _, exists := pqp.tasks[taskID]; !exists { + return } - taskIDs := fqp.taskIDs[:batch] - for _, taskID := range taskIDs { - task := fqp.tasks[taskID] - delete(fqp.tasks, taskID) - tasks = append(tasks, task) - } - fqp.taskIDs = fqp.taskIDs[batch:] - return tasks -} + delete(pqp.tasks, taskID) -func (fqp *fairQueuePolicy) Get(taskID UniqueID) Task { - fqp.lock.RLock() - defer fqp.lock.RUnlock() - if len(fqp.taskIDs) == 0 { - return nil - } - task := fqp.tasks[taskID] - return task -} - -func (fqp *fairQueuePolicy) TaskCount() int { - fqp.lock.RLock() - defer fqp.lock.RUnlock() - return len(fqp.taskIDs) -} - -func (fqp *fairQueuePolicy) Exist(taskID UniqueID) bool { - fqp.lock.RLock() - defer fqp.lock.RUnlock() - _, ok := fqp.tasks[taskID] - return ok -} - -func (fqp *fairQueuePolicy) Remove(taskID UniqueID) { - fqp.lock.Lock() - defer fqp.lock.Unlock() - - taskIndex := -1 - for i := range fqp.taskIDs { - if fqp.taskIDs[i] == taskID { - taskIndex = i + // Find and remove from heap + for i, task := range *pqp.heap { + if task.GetTaskID() == taskID { + heap.Remove(pqp.heap, i) break } } - if taskIndex != -1 { - fqp.taskIDs = append(fqp.taskIDs[:taskIndex], fqp.taskIDs[taskIndex+1:]...) - delete(fqp.tasks, taskID) - } -} - -func (fqp *fairQueuePolicy) Keys() []UniqueID { - fqp.lock.RLock() - defer fqp.lock.RUnlock() - - return fqp.taskIDs } diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index b1bcdebf80..9e492fd721 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -81,7 +81,7 @@ func newTaskScheduler( ctx: ctx, cancel: cancel, meta: metaTable, - pendingTasks: newFairQueuePolicy(), + pendingTasks: newPriorityQueuePolicy(), runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](), notifyChan: make(chan struct{}, 1), taskLock: lock.NewKeyLock[int64](), diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 4406281510..2c51617a4c 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -2362,7 +2362,7 @@ func (s *taskSchedulerSuite) Test_zeroSegmentStats() { ctx: ctx, cancel: cancel, meta: mt, - pendingTasks: newFairQueuePolicy(), + pendingTasks: newPriorityQueuePolicy(), runningTasks: typeutil.NewConcurrentMap[UniqueID, Task](), notifyChan: make(chan struct{}, 1), taskLock: lock.NewKeyLock[int64](),