From ff9bdf702972adfa01bfee464ef1f72394f5a314 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sat, 9 Nov 2024 07:48:26 +0800 Subject: [PATCH] fix: Fix load slowly (#37454) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When there're a lot of loaded collections, they would occupy the target observer scheduler’s pool. This prevents loading collections from updating the current target in time, slowing down the load process. This PR adds a separate target dispatcher for loading collections. issue: https://github.com/milvus-io/milvus/issues/37166 --------- Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target_manager.go | 4 +-- .../querycoordv2/observers/target_observer.go | 28 +++++++++++++------ .../observers/target_observer_test.go | 7 +++-- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index fd2ef16489..a4217cbb93 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -159,7 +159,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID }) - allocatedTarget := NewCollectionTarget(nil, nil, partitionIDs) channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) @@ -194,7 +193,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { return nil } - mgr.next.updateCollectionTarget(collectionID, NewCollectionTarget(segments, dmChannels, partitionIDs)) + allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + mgr.next.updateCollectionTarget(collectionID, allocatedTarget) log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs), diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 83bddf4836..d586341548 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -87,8 +87,12 @@ type TargetObserver struct { mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers - dispatcher *taskDispatcher[int64] - keylocks *lock.KeyLock[int64] + // loadingDispatcher updates targets for collections that are loading (also collections without a current target). + loadingDispatcher *taskDispatcher[int64] + // loadedDispatcher updates targets for loaded collections. + loadedDispatcher *taskDispatcher[int64] + + keylocks *lock.KeyLock[int64] startOnce sync.Once stopOnce sync.Once @@ -114,8 +118,8 @@ func NewTargetObserver( keylocks: lock.NewKeyLock[int64](), } - dispatcher := newTaskDispatcher(result.check) - result.dispatcher = dispatcher + result.loadingDispatcher = newTaskDispatcher(result.check) + result.loadedDispatcher = newTaskDispatcher(result.check) return result } @@ -124,7 +128,8 @@ func (ob *TargetObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel - ob.dispatcher.Start() + ob.loadingDispatcher.Start() + ob.loadedDispatcher.Start() ob.wg.Add(1) go func() { @@ -144,7 +149,8 @@ func (ob *TargetObserver) Stop() { } ob.wg.Wait() - ob.dispatcher.Stop() + ob.loadingDispatcher.Stop() + ob.loadedDispatcher.Stop() }) } @@ -167,7 +173,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - ob.dispatcher.AddTask(ob.meta.GetAll()...) + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + ob.loadedDispatcher.AddTask(loaded...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -217,7 +229,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } return result } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 45d804b0f8..026626ef17 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -90,7 +90,9 @@ func (suite *TargetObserverSuite) SetupTest() { suite.collectionID = int64(1000) suite.partitionID = int64(100) - err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1)) + testCollection := utils.CreateTestCollection(suite.collectionID, 1) + testCollection.Status = querypb.LoadStatus_Loaded + err = suite.meta.CollectionManager.PutCollection(testCollection) suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) @@ -302,7 +304,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() { func (s *TargetObserverCheckSuite) TestCheck() { r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) - s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) + s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID)) + s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID)) } func TestTargetObserver(t *testing.T) {