fix: Ensure task execution order by using a priority queue (#43271)

issue: #43260

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-07-22 17:42:53 +08:00 committed by GitHub
parent e26a532504
commit f19e0ef6e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 183 additions and 150 deletions

View File

@ -43,7 +43,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
@ -1659,16 +1658,6 @@ func (m *meta) completeMixCompactionMutation(
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
resultInvisible := false
targetSegmentLevel := datapb.SegmentLevel_L1
if t.GetType() == datapb.CompactionType_SortCompaction {
resultInvisible = compactFromSegInfos[0].GetIsInvisible()
targetSegmentLevel = compactFromSegInfos[0].GetLevel()
if !compactFromSegInfos[0].GetCreatedByCompaction() {
resultInvisible = false
}
}
compactToSegments := make([]*SegmentInfo, 0)
for _, compactToSegment := range result.GetSegments() {
compactToSegmentInfo := NewSegmentInfo(
@ -1689,7 +1678,7 @@ func (m *meta) completeMixCompactionMutation(
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
Level: targetSegmentLevel,
Level: datapb.SegmentLevel_L1,
StorageVersion: compactToSegment.GetStorageVersion(),
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
@ -1697,8 +1686,7 @@ func (m *meta) completeMixCompactionMutation(
DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
IsSorted: compactToSegment.GetIsSorted(),
IsInvisible: resultInvisible,
IsSorted: compactToSegment.GetIsSorted(),
})
if compactToSegmentInfo.GetNumOfRows() == 0 {
@ -1756,10 +1744,12 @@ func (m *meta) CompleteCompactionMutation(ctx context.Context, t *datapb.Compact
m.segMu.Lock()
defer m.segMu.Unlock()
switch t.GetType() {
case datapb.CompactionType_MixCompaction, datapb.CompactionType_SortCompaction:
case datapb.CompactionType_MixCompaction:
return m.completeMixCompactionMutation(t, result)
case datapb.CompactionType_ClusteringCompaction:
return m.completeClusterCompactionMutation(t, result)
case datapb.CompactionType_SortCompaction:
return m.completeSortCompactionMutation(t, result)
}
return nil, nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
@ -2103,35 +2093,30 @@ func (m *meta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.Partiti
return nil
}
func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.StatsResult) (*segMetricMutation, error) {
m.segMu.Lock()
defer m.segMu.Unlock()
log := log.Ctx(m.ctx).With(zap.Int64("collectionID", result.GetCollectionID()),
zap.Int64("partitionID", result.GetPartitionID()),
zap.Int64("old segmentID", oldSegmentID),
zap.Int64("target segmentID", result.GetSegmentID()))
func (m *meta) completeSortCompactionMutation(
t *datapb.CompactionTask,
result *datapb.CompactionPlanResult,
) ([]*SegmentInfo, *segMetricMutation, error) {
log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
oldSegment := m.segments.GetSegment(oldSegmentID)
compactFromSegID := t.GetInputSegments()[0]
oldSegment := m.segments.GetSegment(compactFromSegID)
if oldSegment == nil {
log.Warn("old segment is not found with stats task")
return nil, merr.WrapErrSegmentNotFound(oldSegmentID)
return nil, nil, merr.WrapErrSegmentNotFound(compactFromSegID)
}
cloned := oldSegment.Clone()
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
resultInvisible := oldSegment.GetIsInvisible()
if !oldSegment.GetCreatedByCompaction() {
resultInvisible = false
}
resultSegment := result.GetSegments()[0]
segmentInfo := &datapb.SegmentInfo{
CollectionID: oldSegment.GetCollectionID(),
PartitionID: oldSegment.GetPartitionID(),
@ -2149,17 +2134,17 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
CreatedByCompaction: oldSegment.GetCreatedByCompaction(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Binlogs: result.GetInsertLogs(),
Statslogs: result.GetStatsLogs(),
TextStatsLogs: result.GetTextStatsLogs(),
Bm25Statslogs: result.GetBm25Logs(),
JsonKeyStats: result.GetJsonKeyStatsLogs(),
Deltalogs: nil,
CompactionFrom: []int64{oldSegmentID},
ID: resultSegment.GetSegmentID(),
NumOfRows: resultSegment.GetNumOfRows(),
Binlogs: resultSegment.GetInsertLogs(),
Statslogs: resultSegment.GetField2StatslogPaths(),
TextStatsLogs: resultSegment.GetTextStatsLogs(),
Bm25Statslogs: resultSegment.GetBm25Logs(),
Deltalogs: resultSegment.GetDeltalogs(),
CompactionFrom: []int64{compactFromSegID},
IsSorted: true,
}
segment := NewSegmentInfo(segmentInfo)
if segment.GetNumOfRows() > 0 {
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
@ -2169,16 +2154,24 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
log.Info("drop segment due to 0 rows", zap.Int64("segmentID", segment.GetID()))
}
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows()))
cloned := oldSegment.Clone()
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID()))
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", segment.GetNumOfRows()))
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, err
return nil, nil, err
}
m.segments.SetSegment(oldSegmentID, cloned)
m.segments.SetSegment(result.GetSegmentID(), segment)
return metricMutation, nil
m.segments.SetSegment(oldSegment.GetID(), cloned)
m.segments.SetSegment(segment.GetID(), segment)
log.Info("meta update: alter in memory meta after compaction - complete")
return []*SegmentInfo{segment}, metricMutation, nil
}
func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {

View File

@ -1,94 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"sync"
)
type FIFOQueue interface {
Push(task Task)
Pop() Task
Get(taskID int64) Task
Remove(taskID int64)
TaskIDs() []int64
}
type fifoQueue struct {
lock sync.RWMutex
tasks map[int64]Task
taskIDs []int64
}
func NewFIFOQueue() FIFOQueue {
return &fifoQueue{
lock: sync.RWMutex{},
tasks: make(map[int64]Task, 0),
taskIDs: make([]int64, 0),
}
}
func (f *fifoQueue) Push(t Task) {
f.lock.Lock()
defer f.lock.Unlock()
if _, ok := f.tasks[t.GetTaskID()]; !ok {
f.tasks[t.GetTaskID()] = t
f.taskIDs = append(f.taskIDs, t.GetTaskID())
}
}
func (f *fifoQueue) Pop() Task {
f.lock.Lock()
defer f.lock.Unlock()
if len(f.taskIDs) == 0 {
return nil
}
taskID := f.taskIDs[0]
f.taskIDs = f.taskIDs[1:]
task := f.tasks[taskID]
delete(f.tasks, taskID)
return task
}
func (f *fifoQueue) Get(taskID int64) Task {
f.lock.RLock()
defer f.lock.RUnlock()
return f.tasks[taskID]
}
func (f *fifoQueue) Remove(taskID int64) {
f.lock.Lock()
defer f.lock.Unlock()
index := -1
for i := range f.taskIDs {
if f.taskIDs[i] == taskID {
index = i
break
}
}
if index != -1 {
f.taskIDs = append(f.taskIDs[:index], f.taskIDs[index+1:]...)
delete(f.tasks, taskID)
}
}
func (f *fifoQueue) TaskIDs() []int64 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.taskIDs
}

View File

@ -51,7 +51,7 @@ type globalTaskScheduler struct {
wg sync.WaitGroup
mu *lock.KeyLock[int64]
pendingTasks FIFOQueue
pendingTasks PriorityQueue
runningTasks *typeutil.ConcurrentMap[int64, Task]
execPool *conc.Pool[struct{}]
checkPool *conc.Pool[struct{}]
@ -324,7 +324,7 @@ func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) Global
cancel: cancel,
wg: sync.WaitGroup{},
mu: lock.NewKeyLock[int64](),
pendingTasks: NewFIFOQueue(),
pendingTasks: NewPriorityQueuePolicy(),
runningTasks: typeutil.NewConcurrentMap[int64, Task](),
execPool: execPool,
checkPool: checkPool,

View File

@ -0,0 +1,134 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"container/heap"
"sync"
)
// PriorityQueue is the policy of scheduler.
type PriorityQueue interface {
Push(task Task)
// Pop get the task next ready to run.
Pop() Task
Get(taskID int64) Task
Remove(taskID int64)
TaskIDs() []int64
}
var (
_ PriorityQueue = &priorityQueuePolicy{}
)
// priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority)
type priorityQueuePolicy struct {
lock sync.RWMutex
tasks map[int64]Task
heap *taskHeap
}
// taskHeap implements a min-heap for Task objects, sorted by taskID
type taskHeap []Task
func (h taskHeap) Len() int { return len(h) }
func (h taskHeap) Less(i, j int) bool { return h[i].GetTaskID() < h[j].GetTaskID() }
func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *taskHeap) Push(x interface{}) {
*h = append(*h, x.(Task))
}
func (h *taskHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
// NewPriorityQueuePolicy creates a new priority queue policy
func NewPriorityQueuePolicy() *priorityQueuePolicy {
h := &taskHeap{}
heap.Init(h)
return &priorityQueuePolicy{
tasks: make(map[int64]Task),
heap: h,
lock: sync.RWMutex{},
}
}
func (pqp *priorityQueuePolicy) Push(task Task) {
pqp.lock.Lock()
defer pqp.lock.Unlock()
taskID := task.GetTaskID()
if _, exists := pqp.tasks[taskID]; !exists {
pqp.tasks[taskID] = task
heap.Push(pqp.heap, task)
}
}
func (pqp *priorityQueuePolicy) Pop() Task {
pqp.lock.Lock()
defer pqp.lock.Unlock()
if pqp.heap.Len() == 0 {
return nil
}
task := heap.Pop(pqp.heap).(Task)
delete(pqp.tasks, task.GetTaskID())
return task
}
func (pqp *priorityQueuePolicy) Get(taskID int64) Task {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
return pqp.tasks[taskID]
}
func (pqp *priorityQueuePolicy) TaskIDs() []int64 {
pqp.lock.RLock()
defer pqp.lock.RUnlock()
taskIDs := make([]int64, 0, len(pqp.tasks))
for _, t := range *pqp.heap {
taskIDs = append(taskIDs, t.GetTaskID())
}
return taskIDs
}
func (pqp *priorityQueuePolicy) Remove(taskID int64) {
pqp.lock.Lock()
defer pqp.lock.Unlock()
if _, exists := pqp.tasks[taskID]; !exists {
return
}
delete(pqp.tasks, taskID)
// Find and remove from heap
for i, task := range *pqp.heap {
if task.GetTaskID() == taskID {
heap.Remove(pqp.heap, i)
break
}
}
}

View File

@ -7,7 +7,7 @@ import (
)
func TestFIFOQueue_Push(t *testing.T) {
queue := NewFIFOQueue()
queue := NewPriorityQueuePolicy()
// Test adding tasks
task1 := NewMockTask(t)
@ -31,7 +31,7 @@ func TestFIFOQueue_Push(t *testing.T) {
}
func TestFIFOQueue_Pop(t *testing.T) {
queue := NewFIFOQueue()
queue := NewPriorityQueuePolicy()
// Test empty queue
assert.Nil(t, queue.Pop())
@ -54,7 +54,7 @@ func TestFIFOQueue_Pop(t *testing.T) {
}
func TestFIFOQueue_Get(t *testing.T) {
queue := NewFIFOQueue()
queue := NewPriorityQueuePolicy()
// Test getting non-existent task
assert.Nil(t, queue.Get(1))
@ -69,7 +69,7 @@ func TestFIFOQueue_Get(t *testing.T) {
}
func TestFIFOQueue_Remove(t *testing.T) {
queue := NewFIFOQueue()
queue := NewPriorityQueuePolicy()
// Test removing non-existent task
queue.Remove(1)
@ -98,7 +98,7 @@ func TestFIFOQueue_Remove(t *testing.T) {
}
func TestFIFOQueue_TaskIDs(t *testing.T) {
queue := NewFIFOQueue()
queue := NewPriorityQueuePolicy()
// Test empty queue
assert.Equal(t, 0, len(queue.TaskIDs()))