diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 26095080d9..de44640443 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -43,7 +43,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/lock" @@ -1659,16 +1658,6 @@ func (m *meta) completeMixCompactionMutation( log = log.With(zap.Int64s("compactFrom", compactFromSegIDs)) - resultInvisible := false - targetSegmentLevel := datapb.SegmentLevel_L1 - if t.GetType() == datapb.CompactionType_SortCompaction { - resultInvisible = compactFromSegInfos[0].GetIsInvisible() - targetSegmentLevel = compactFromSegInfos[0].GetLevel() - if !compactFromSegInfos[0].GetCreatedByCompaction() { - resultInvisible = false - } - } - compactToSegments := make([]*SegmentInfo, 0) for _, compactToSegment := range result.GetSegments() { compactToSegmentInfo := NewSegmentInfo( @@ -1689,7 +1678,7 @@ func (m *meta) completeMixCompactionMutation( CreatedByCompaction: true, CompactionFrom: compactFromSegIDs, LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0), - Level: targetSegmentLevel, + Level: datapb.SegmentLevel_L1, StorageVersion: compactToSegment.GetStorageVersion(), StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { return info.GetStartPosition() @@ -1697,8 +1686,7 @@ func (m *meta) completeMixCompactionMutation( DmlPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition { return info.GetDmlPosition() })), - IsSorted: compactToSegment.GetIsSorted(), - IsInvisible: resultInvisible, + IsSorted: compactToSegment.GetIsSorted(), }) if compactToSegmentInfo.GetNumOfRows() == 0 { @@ -1756,10 +1744,12 @@ func (m *meta) CompleteCompactionMutation(ctx context.Context, t *datapb.Compact m.segMu.Lock() defer m.segMu.Unlock() switch t.GetType() { - case datapb.CompactionType_MixCompaction, datapb.CompactionType_SortCompaction: + case datapb.CompactionType_MixCompaction: return m.completeMixCompactionMutation(t, result) case datapb.CompactionType_ClusteringCompaction: return m.completeClusterCompactionMutation(t, result) + case datapb.CompactionType_SortCompaction: + return m.completeSortCompactionMutation(t, result) } return nil, nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") } @@ -2103,35 +2093,30 @@ func (m *meta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.Partiti return nil } -func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.StatsResult) (*segMetricMutation, error) { - m.segMu.Lock() - defer m.segMu.Unlock() - - log := log.Ctx(m.ctx).With(zap.Int64("collectionID", result.GetCollectionID()), - zap.Int64("partitionID", result.GetPartitionID()), - zap.Int64("old segmentID", oldSegmentID), - zap.Int64("target segmentID", result.GetSegmentID())) +func (m *meta) completeSortCompactionMutation( + t *datapb.CompactionTask, + result *datapb.CompactionPlanResult, +) ([]*SegmentInfo, *segMetricMutation, error) { + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetPlanID()), + zap.String("type", t.GetType().String()), + zap.Int64("collectionID", t.CollectionID), + zap.Int64("partitionID", t.PartitionID), + zap.String("channel", t.GetChannel())) metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} - - oldSegment := m.segments.GetSegment(oldSegmentID) + compactFromSegID := t.GetInputSegments()[0] + oldSegment := m.segments.GetSegment(compactFromSegID) if oldSegment == nil { - log.Warn("old segment is not found with stats task") - return nil, merr.WrapErrSegmentNotFound(oldSegmentID) + return nil, nil, merr.WrapErrSegmentNotFound(compactFromSegID) } - cloned := oldSegment.Clone() - cloned.DroppedAt = uint64(time.Now().UnixNano()) - cloned.Compacted = true - - // metrics mutation for compaction from segments - updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) - resultInvisible := oldSegment.GetIsInvisible() if !oldSegment.GetCreatedByCompaction() { resultInvisible = false } + resultSegment := result.GetSegments()[0] + segmentInfo := &datapb.SegmentInfo{ CollectionID: oldSegment.GetCollectionID(), PartitionID: oldSegment.GetPartitionID(), @@ -2149,17 +2134,17 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(), CreatedByCompaction: oldSegment.GetCreatedByCompaction(), IsInvisible: resultInvisible, - ID: result.GetSegmentID(), - NumOfRows: result.GetNumRows(), - Binlogs: result.GetInsertLogs(), - Statslogs: result.GetStatsLogs(), - TextStatsLogs: result.GetTextStatsLogs(), - Bm25Statslogs: result.GetBm25Logs(), - JsonKeyStats: result.GetJsonKeyStatsLogs(), - Deltalogs: nil, - CompactionFrom: []int64{oldSegmentID}, + ID: resultSegment.GetSegmentID(), + NumOfRows: resultSegment.GetNumOfRows(), + Binlogs: resultSegment.GetInsertLogs(), + Statslogs: resultSegment.GetField2StatslogPaths(), + TextStatsLogs: resultSegment.GetTextStatsLogs(), + Bm25Statslogs: resultSegment.GetBm25Logs(), + Deltalogs: resultSegment.GetDeltalogs(), + CompactionFrom: []int64{compactFromSegID}, IsSorted: true, } + segment := NewSegmentInfo(segmentInfo) if segment.GetNumOfRows() > 0 { metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows()) @@ -2169,16 +2154,24 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats log.Info("drop segment due to 0 rows", zap.Int64("segmentID", segment.GetID())) } - log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows())) + cloned := oldSegment.Clone() + cloned.DroppedAt = uint64(time.Now().UnixNano()) + cloned.Compacted = true + + updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) + + log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID())) + + log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", segment.GetNumOfRows())) if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) - return nil, err + return nil, nil, err } - m.segments.SetSegment(oldSegmentID, cloned) - m.segments.SetSegment(result.GetSegmentID(), segment) - - return metricMutation, nil + m.segments.SetSegment(oldSegment.GetID(), cloned) + m.segments.SetSegment(segment.GetID(), segment) + log.Info("meta update: alter in memory meta after compaction - complete") + return []*SegmentInfo{segment}, metricMutation, nil } func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment { diff --git a/internal/datacoord/task/fifo_queue.go b/internal/datacoord/task/fifo_queue.go deleted file mode 100644 index 01e0d518dc..0000000000 --- a/internal/datacoord/task/fifo_queue.go +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "sync" -) - -type FIFOQueue interface { - Push(task Task) - Pop() Task - Get(taskID int64) Task - Remove(taskID int64) - TaskIDs() []int64 -} - -type fifoQueue struct { - lock sync.RWMutex - tasks map[int64]Task - taskIDs []int64 -} - -func NewFIFOQueue() FIFOQueue { - return &fifoQueue{ - lock: sync.RWMutex{}, - tasks: make(map[int64]Task, 0), - taskIDs: make([]int64, 0), - } -} - -func (f *fifoQueue) Push(t Task) { - f.lock.Lock() - defer f.lock.Unlock() - if _, ok := f.tasks[t.GetTaskID()]; !ok { - f.tasks[t.GetTaskID()] = t - f.taskIDs = append(f.taskIDs, t.GetTaskID()) - } -} - -func (f *fifoQueue) Pop() Task { - f.lock.Lock() - defer f.lock.Unlock() - if len(f.taskIDs) == 0 { - return nil - } - taskID := f.taskIDs[0] - f.taskIDs = f.taskIDs[1:] - task := f.tasks[taskID] - delete(f.tasks, taskID) - return task -} - -func (f *fifoQueue) Get(taskID int64) Task { - f.lock.RLock() - defer f.lock.RUnlock() - return f.tasks[taskID] -} - -func (f *fifoQueue) Remove(taskID int64) { - f.lock.Lock() - defer f.lock.Unlock() - - index := -1 - for i := range f.taskIDs { - if f.taskIDs[i] == taskID { - index = i - break - } - } - if index != -1 { - f.taskIDs = append(f.taskIDs[:index], f.taskIDs[index+1:]...) - delete(f.tasks, taskID) - } -} - -func (f *fifoQueue) TaskIDs() []int64 { - f.lock.RLock() - defer f.lock.RUnlock() - return f.taskIDs -} diff --git a/internal/datacoord/task/global_scheduler.go b/internal/datacoord/task/global_scheduler.go index dd9bd3dd74..97ae9caf09 100644 --- a/internal/datacoord/task/global_scheduler.go +++ b/internal/datacoord/task/global_scheduler.go @@ -51,7 +51,7 @@ type globalTaskScheduler struct { wg sync.WaitGroup mu *lock.KeyLock[int64] - pendingTasks FIFOQueue + pendingTasks PriorityQueue runningTasks *typeutil.ConcurrentMap[int64, Task] execPool *conc.Pool[struct{}] checkPool *conc.Pool[struct{}] @@ -324,7 +324,7 @@ func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) Global cancel: cancel, wg: sync.WaitGroup{}, mu: lock.NewKeyLock[int64](), - pendingTasks: NewFIFOQueue(), + pendingTasks: NewPriorityQueuePolicy(), runningTasks: typeutil.NewConcurrentMap[int64, Task](), execPool: execPool, checkPool: checkPool, diff --git a/internal/datacoord/task/priority_queue.go b/internal/datacoord/task/priority_queue.go new file mode 100644 index 0000000000..ce31f2fe80 --- /dev/null +++ b/internal/datacoord/task/priority_queue.go @@ -0,0 +1,134 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "container/heap" + "sync" +) + +// PriorityQueue is the policy of scheduler. +type PriorityQueue interface { + Push(task Task) + // Pop get the task next ready to run. + Pop() Task + Get(taskID int64) Task + Remove(taskID int64) + TaskIDs() []int64 +} + +var ( + _ PriorityQueue = &priorityQueuePolicy{} +) + +// priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority) +type priorityQueuePolicy struct { + lock sync.RWMutex + tasks map[int64]Task + heap *taskHeap +} + +// taskHeap implements a min-heap for Task objects, sorted by taskID +type taskHeap []Task + +func (h taskHeap) Len() int { return len(h) } +func (h taskHeap) Less(i, j int) bool { return h[i].GetTaskID() < h[j].GetTaskID() } +func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *taskHeap) Push(x interface{}) { + *h = append(*h, x.(Task)) +} + +func (h *taskHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + return item +} + +// NewPriorityQueuePolicy creates a new priority queue policy +func NewPriorityQueuePolicy() *priorityQueuePolicy { + h := &taskHeap{} + heap.Init(h) + return &priorityQueuePolicy{ + tasks: make(map[int64]Task), + heap: h, + lock: sync.RWMutex{}, + } +} + +func (pqp *priorityQueuePolicy) Push(task Task) { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + taskID := task.GetTaskID() + if _, exists := pqp.tasks[taskID]; !exists { + pqp.tasks[taskID] = task + heap.Push(pqp.heap, task) + } +} + +func (pqp *priorityQueuePolicy) Pop() Task { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + if pqp.heap.Len() == 0 { + return nil + } + + task := heap.Pop(pqp.heap).(Task) + delete(pqp.tasks, task.GetTaskID()) + return task +} + +func (pqp *priorityQueuePolicy) Get(taskID int64) Task { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + return pqp.tasks[taskID] +} + +func (pqp *priorityQueuePolicy) TaskIDs() []int64 { + pqp.lock.RLock() + defer pqp.lock.RUnlock() + + taskIDs := make([]int64, 0, len(pqp.tasks)) + for _, t := range *pqp.heap { + taskIDs = append(taskIDs, t.GetTaskID()) + } + return taskIDs +} + +func (pqp *priorityQueuePolicy) Remove(taskID int64) { + pqp.lock.Lock() + defer pqp.lock.Unlock() + + if _, exists := pqp.tasks[taskID]; !exists { + return + } + + delete(pqp.tasks, taskID) + + // Find and remove from heap + for i, task := range *pqp.heap { + if task.GetTaskID() == taskID { + heap.Remove(pqp.heap, i) + break + } + } +} diff --git a/internal/datacoord/task/fifo_queue_test.go b/internal/datacoord/task/priority_queue_test.go similarity index 93% rename from internal/datacoord/task/fifo_queue_test.go rename to internal/datacoord/task/priority_queue_test.go index 84950aaf41..a472f7b62b 100644 --- a/internal/datacoord/task/fifo_queue_test.go +++ b/internal/datacoord/task/priority_queue_test.go @@ -7,7 +7,7 @@ import ( ) func TestFIFOQueue_Push(t *testing.T) { - queue := NewFIFOQueue() + queue := NewPriorityQueuePolicy() // Test adding tasks task1 := NewMockTask(t) @@ -31,7 +31,7 @@ func TestFIFOQueue_Push(t *testing.T) { } func TestFIFOQueue_Pop(t *testing.T) { - queue := NewFIFOQueue() + queue := NewPriorityQueuePolicy() // Test empty queue assert.Nil(t, queue.Pop()) @@ -54,7 +54,7 @@ func TestFIFOQueue_Pop(t *testing.T) { } func TestFIFOQueue_Get(t *testing.T) { - queue := NewFIFOQueue() + queue := NewPriorityQueuePolicy() // Test getting non-existent task assert.Nil(t, queue.Get(1)) @@ -69,7 +69,7 @@ func TestFIFOQueue_Get(t *testing.T) { } func TestFIFOQueue_Remove(t *testing.T) { - queue := NewFIFOQueue() + queue := NewPriorityQueuePolicy() // Test removing non-existent task queue.Remove(1) @@ -98,7 +98,7 @@ func TestFIFOQueue_Remove(t *testing.T) { } func TestFIFOQueue_TaskIDs(t *testing.T) { - queue := NewFIFOQueue() + queue := NewPriorityQueuePolicy() // Test empty queue assert.Equal(t, 0, len(queue.TaskIDs()))