From 9b2b2a2689b716dcaa1ec201ae3cf6f7ac108e18 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 3 Jan 2025 11:18:52 +0800 Subject: [PATCH] enhance: [10kcp] Remove scheduler and target manager mutex (#38968) supplement to PR https://github.com/milvus-io/milvus/pull/38566 Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target.go | 18 +- internal/querycoordv2/meta/target_manager.go | 63 +----- .../querycoordv2/meta/target_manager_test.go | 35 +-- internal/querycoordv2/task/scheduler.go | 203 +++++++++--------- internal/querycoordv2/task/task_test.go | 13 +- pkg/metrics/querycoord_metrics.go | 1 + 6 files changed, 152 insertions(+), 181 deletions(-) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index c69139ae7f..3d7a511c08 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -218,28 +219,33 @@ func (p *CollectionTarget) Ready() bool { } type target struct { + keyLock *lock.KeyLock[int64] // guards updateCollectionTarget // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget] } func newTarget() *target { return &target{ - collectionTargetMap: make(map[int64]*CollectionTarget), + keyLock: lock.NewKeyLock[int64](), + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), } } func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) { - if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() { + t.keyLock.Lock(collectionID) + defer t.keyLock.Unlock(collectionID) + if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() { return } - t.collectionTargetMap[collectionID] = target + t.collectionTargetMap.Insert(collectionID, target) } func (t *target) removeCollectionTarget(collectionID int64) { - delete(t.collectionTargetMap, collectionID) + t.collectionTargetMap.Remove(collectionID) } func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { - return t.collectionTargetMap[collectionID] + ret, _ := t.collectionTargetMap.Get(collectionID) + return ret } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 062a8758b2..114ba36b08 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -72,9 +72,8 @@ type TargetManagerInterface interface { } type TargetManager struct { - rwMutex sync.RWMutex - broker Broker - meta *Meta + broker Broker + meta *Meta // all read segment/channel operation happens on current -> only current target are visible to outer // all add segment/channel operation happens on next -> changes can only happen on next target @@ -96,8 +95,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID)) log.Debug("start to update current target for collection") @@ -192,9 +189,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) - mgr.rwMutex.Lock() mgr.next.updateCollectionTarget(collectionID, allocatedTarget) - mgr.rwMutex.Unlock() log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), @@ -225,8 +220,6 @@ func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { // RemoveCollection removes all channels and segments in the given collection func (mgr *TargetManager) RemoveCollection(collectionID int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log.Info("remove collection from targets", zap.Int64("collectionID", collectionID)) @@ -247,9 +240,6 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) { // RemovePartition removes all segment in the given partition, // NOTE: this doesn't remove any channel even the given one is the only partition func (mgr *TargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - log := log.With(zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -356,9 +346,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -379,9 +366,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := typeutil.NewUniqueSet() @@ -402,9 +386,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -418,9 +399,6 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { @@ -439,9 +417,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) []int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if channel, ok := t.dmChannels[channelName]; ok { @@ -456,9 +431,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) @@ -475,9 +447,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, } func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -488,9 +457,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope Ta } func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if ch, ok := t.GetAllDmChannels()[channel]; ok { @@ -501,9 +467,6 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope } func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if s, ok := t.GetAllSegments()[id]; ok { @@ -515,9 +478,6 @@ func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope T } func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if t.GetTargetVersion() > 0 { @@ -529,9 +489,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T } func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(CurrentTarget, collectionID) return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 @@ -544,8 +501,6 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { } func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() if mgr.current != nil { // use pool here to control maximal writer used by save target pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) @@ -569,13 +524,14 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) }) } tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) - for id, target := range mgr.current.collectionTargetMap { + mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool { tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) if len(tasks) >= batchSize { submit(tasks) tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) } - } + return true + }) if len(tasks) > 0 { submit(tasks) } @@ -584,9 +540,6 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) } func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - targets, err := catalog.GetCollectionTargets() if err != nil { log.Warn("failed to recover collection target from etcd", zap.Error(err)) @@ -615,8 +568,6 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { // if segment isn't l0 segment, and exist in current/next target, then it can be moved func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() current := mgr.current.getCollectionTarget(collectionID) if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { return true @@ -631,9 +582,7 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool } func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - target, ok := mgr.current.collectionTargetMap[collectionID] + target, ok := mgr.current.collectionTargetMap.Get(collectionID) if !ok { return false } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index c76c78f5a8..fc6cfacf10 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -412,33 +412,38 @@ func (suite *TargetManagerSuite) TestGetTarget() { current := &CollectionTarget{} next := &CollectionTarget{} + t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t1.Insert(1000, current) + t2.Insert(1000, next) + t3.Insert(1000, current) + t4.Insert(1000, current) + bothMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t1, }, next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: next, - }, + collectionTargetMap: t2, }, } currentMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t3, + }, + next: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - next: &target{}, } nextMgr := &TargetManager{ next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t4, + }, + current: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - current: &target{}, } cases := []testCase{ @@ -647,7 +652,7 @@ func BenchmarkTargetManager(b *testing.B) { collectionNum := 10000 for i := 0; i < collectionNum; i++ { - mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil)) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index f095b82ec5..7552a516b3 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -90,6 +91,7 @@ type replicaChannelIndex struct { } type taskQueue struct { + mu sync.RWMutex // TaskPriority -> TaskID -> Task buckets []map[int64]Task } @@ -105,6 +107,8 @@ func newTaskQueue() *taskQueue { } func (queue *taskQueue) Len() int { + queue.mu.RLock() + defer queue.mu.RUnlock() taskNum := 0 for _, tasks := range queue.buckets { taskNum += len(tasks) @@ -114,17 +118,23 @@ func (queue *taskQueue) Len() int { } func (queue *taskQueue) Add(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] bucket[task.ID()] = task } func (queue *taskQueue) Remove(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] delete(bucket, task.ID()) } // Range iterates all tasks in the queue ordered by priority from high to low func (queue *taskQueue) Range(fn func(task Task) bool) { + queue.mu.RLock() + defer queue.mu.RUnlock() for priority := len(queue.buckets) - 1; priority >= 0; priority-- { for _, task := range queue.buckets[priority] { if !fn(task) { @@ -151,9 +161,8 @@ type Scheduler interface { } type taskScheduler struct { - rwmutex sync.RWMutex ctx context.Context - executors map[int64]*Executor // NodeID -> Executor + executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor idAllocator func() UniqueID distMgr *meta.DistributionManager @@ -163,9 +172,10 @@ type taskScheduler struct { cluster session.Cluster nodeMgr *session.NodeManager - tasks UniqueSet - segmentTasks map[replicaSegmentIndex]Task - channelTasks map[replicaChannelIndex]Task + collKeyLock *lock.KeyLock[int64] // guards Add() + tasks *ConcurrentMap[UniqueID, struct{}] + segmentTasks *ConcurrentMap[replicaSegmentIndex, Task] + channelTasks *ConcurrentMap[replicaChannelIndex, Task] processQueue *taskQueue waitQueue *taskQueue @@ -183,7 +193,7 @@ func NewScheduler(ctx context.Context, id := time.Now().UnixMilli() return &taskScheduler{ ctx: ctx, - executors: make(map[int64]*Executor), + executors: NewConcurrentMap[int64, *Executor](), idAllocator: func() UniqueID { id++ return id @@ -196,9 +206,10 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), + collKeyLock: lock.NewKeyLock[int64](), + tasks: NewConcurrentMap[UniqueID, struct{}](), + segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](), + channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), processQueue: newTaskQueue(), waitQueue: newTaskQueue(), } @@ -207,30 +218,22 @@ func NewScheduler(ctx context.Context, func (scheduler *taskScheduler) Start() {} func (scheduler *taskScheduler) Stop() { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for nodeID, executor := range scheduler.executors { + scheduler.executors.Range(func(nodeID int64, executor *Executor) bool { executor.Stop() - delete(scheduler.executors, nodeID) - } + return true + }) - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { scheduler.remove(task) - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { scheduler.remove(task) - } + return true + }) } func (scheduler *taskScheduler) AddExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - if _, exist := scheduler.executors[nodeID]; exist { - return - } - executor := NewExecutor(scheduler.meta, scheduler.distMgr, scheduler.broker, @@ -238,27 +241,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) - scheduler.executors[nodeID] = executor + if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist { + return + } executor.Start(scheduler.ctx) log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) } func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.GetAndRemove(nodeID) if ok { executor.Stop() - delete(scheduler.executors, nodeID) - log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) + log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } func (scheduler *taskScheduler) Add(task Task) error { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - + scheduler.collKeyLock.Lock(task.CollectionID()) + defer scheduler.collKeyLock.Unlock(task.CollectionID()) err := scheduler.preAdd(task) if err != nil { task.Cancel(err) @@ -267,19 +267,19 @@ func (scheduler *taskScheduler) Add(task Task) error { task.SetID(scheduler.idAllocator()) scheduler.waitQueue.Add(task) - scheduler.tasks.Insert(task.ID()) + scheduler.tasks.Insert(task.ID(), struct{}{}) switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - scheduler.channelTasks[index] = task + scheduler.channelTasks.Insert(index, task) case *LeaderTask: index := NewReplicaLeaderIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) } log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) @@ -288,24 +288,42 @@ func (scheduler *taskScheduler) Add(task Task) error { } func (scheduler *taskScheduler) updateTaskMetrics() { - if time.Since(scheduler.lastUpdateMetricTime) < 30*time.Second { + if time.Since(scheduler.lastUpdateMetricTime) < 60*time.Second { return } - segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 + segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0 + leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 - for _, task := range scheduler.segmentTasks { - taskType := GetTaskType(task) - switch taskType { - case TaskTypeGrow: - segmentGrowNum++ - case TaskTypeReduce: - segmentReduceNum++ - case TaskTypeMove: + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { + switch { + case len(task.Actions()) > 1: segmentMoveNum++ + case task.Actions()[0].Type() == ActionTypeGrow: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentGrowNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderGrowNum++ + } + case task.Actions()[0].Type() == ActionTypeReduce: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentReduceNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderReduceNum++ + } + case task.Actions()[0].Type() == ActionTypeUpdate: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentUpdateNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderUpdateNum++ + } } - } + return true + }) - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskType := GetTaskType(task) switch taskType { case TaskTypeGrow: @@ -315,11 +333,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() { case TaskTypeMove: channelMoveNum++ } - } + return true + }) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum)) + + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) @@ -332,7 +357,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -365,7 +390,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - if old, ok := scheduler.channelTasks[index]; ok { + if old, ok := scheduler.channelTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -398,7 +423,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { } case *LeaderTask: index := NewReplicaLeaderIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -477,46 +502,40 @@ func (scheduler *taskScheduler) Dispatch(node int64) { log.Info("scheduler stopped") default: - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() scheduler.schedule(node) } } func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make([]Action, 0) - for _, t := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, t Task) bool { if collectionID != -1 && collectionID != t.CollectionID() { - continue + return true } for _, action := range t.Actions() { if action.Node() == nodeID { targetActions = append(targetActions, action) } } - } + return true + }) return scheduler.calculateTaskDelta(collectionID, targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make([]Action, 0) - for _, t := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, t Task) bool { if collectionID != -1 && collectionID != t.CollectionID() { - continue + return true } for _, action := range t.Actions() { if action.Node() == nodeID { targetActions = append(targetActions, action) } } - } + return true + }) return scheduler.calculateTaskDelta(collectionID, targetActions) } @@ -548,10 +567,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetAct } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.Get(nodeID) if !ok { return nil } @@ -560,17 +576,11 @@ func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { } func (scheduler *taskScheduler) GetChannelTaskNum() int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - return len(scheduler.channelTasks) + return scheduler.channelTasks.Len() } func (scheduler *taskScheduler) GetSegmentTaskNum() int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - return len(scheduler.segmentTasks) + return scheduler.segmentTasks.Len() } // schedule selects some tasks to execute, follow these steps for each started selected tasks: @@ -593,8 +603,8 @@ func (scheduler *taskScheduler) schedule(node int64) { log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) // Process tasks @@ -642,8 +652,8 @@ func (scheduler *taskScheduler) schedule(node int64) { log.Info("process tasks related to node done", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) } @@ -739,7 +749,7 @@ func (scheduler *taskScheduler) process(task Task) bool { ) actions, step := task.Actions(), task.Step() - executor, ok := scheduler.executors[actions[step].Node()] + executor, ok := scheduler.executors.Get(actions[step].Node()) if !ok { log.Warn("no executor for QueryNode", zap.Int("step", step), @@ -760,19 +770,18 @@ func (scheduler *taskScheduler) check(task Task) error { } func (scheduler *taskScheduler) RemoveByNode(node int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } + return true + }) } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { @@ -808,7 +817,7 @@ func (scheduler *taskScheduler) remove(task Task) { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && @@ -818,12 +827,12 @@ func (scheduler *taskScheduler) remove(task Task) { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - delete(scheduler.channelTasks, index) + scheduler.channelTasks.Remove(index) log = log.With(zap.String("channel", task.Channel())) case *LeaderTask: index := NewReplicaLeaderIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 26301dd566..1c28923131 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1475,10 +1475,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) { suite.Equal(process, scheduler.processQueue.Len()) suite.Equal(wait, scheduler.waitQueue.Len()) - suite.Len(scheduler.segmentTasks, segment) - suite.Len(scheduler.channelTasks, channel) - suite.Equal(len(scheduler.tasks), process+wait) - suite.Equal(len(scheduler.tasks), segment+channel) + suite.Equal(scheduler.segmentTasks.Len(), segment) + suite.Equal(scheduler.channelTasks.Len(), channel) + suite.Equal(scheduler.tasks.Len(), process+wait) + suite.Equal(scheduler.tasks.Len(), segment+channel) } func (suite *TaskSuite) dispatchAndWait(node int64) { @@ -1490,13 +1490,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { count = 0 keys = make([]any, 0) - for _, executor := range suite.scheduler.executors { + suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool { executor.executingTasks.Range(func(taskIndex string) bool { keys = append(keys, taskIndex) count++ return true }) - } + return true + }) if count == 0 { return diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index b8a1301a09..34e589d149 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -36,6 +36,7 @@ const ( LeaderGrowTaskLabel = "leader_grow" LeaderReduceTaskLabel = "leader_reduce" + LeaderUpdateTaskLabel = "leader_update" UnknownTaskLabel = "unknown"