From 02945959d9ac62c7a533f07be8bcbc4fe201c069 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 19 Jun 2024 10:19:58 +0800 Subject: [PATCH] enhance: Avoid to iterate whole segment list for each task's process (#33943) when querycoord process segment task, it will try to iterate whole segment list to checke whether segment is loaded, which cost too much cpu if there has thousands of segments. Signed-off-by: Wei Liu --- .../querycoordv2/meta/segment_dist_manager.go | 17 ----------------- internal/querycoordv2/task/action.go | 17 +++++++++++++---- internal/querycoordv2/task/scheduler.go | 8 +++++--- 3 files changed, 18 insertions(+), 24 deletions(-) diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index f8c37054df..3cf01329c2 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -227,20 +227,3 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen } return ret } - -// return node list which contains the given segmentID -func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64 { - m.rwmutex.RLock() - defer m.rwmutex.RUnlock() - - ret := make([]int64, 0) - for nodeID, segments := range m.segments { - for _, segment := range segments.segments { - if segment.GetID() == segmentID { - ret = append(ret, nodeID) - break - } - } - } - return ret -} diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 5f9dc7250a..54823a988f 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -111,11 +111,20 @@ func (action *SegmentAction) Scope() querypb.DataScope { func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { if action.Type() == ActionTypeGrow { + // rpc finished + if !action.rpcReturned.Load() { + return false + } + + // segment found in leader view views := distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(action.segmentID, false)) - nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID()) - return len(views) > 0 && - lo.Contains(nodeSegmentDist, action.Node()) && - action.rpcReturned.Load() + if len(views) == 0 { + return false + } + + // segment found in dist + segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID())) + return len(segmentInTargetNode) > 0 } else if action.Type() == ActionTypeReduce { // FIXME: Now shard leader's segment view is a map of segment ID to node ID, // loading segment replaces the node ID with the new one, diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index fde3947cc6..6bec7532db 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -345,9 +345,11 @@ func (scheduler *taskScheduler) preAdd(task Task) error { if taskType == TaskTypeMove { views := scheduler.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(task.SegmentID(), false)) - nodeSegmentDist := scheduler.distMgr.SegmentDistManager.GetSegmentDist(task.SegmentID()) - if len(views) == 0 || - !lo.Contains(nodeSegmentDist, task.Actions()[1].Node()) { + if len(views) == 0 { + return merr.WrapErrServiceInternal("segment's delegator not found, stop balancing") + } + segmentInTargetNode := scheduler.distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(task.Actions()[1].Node()), meta.WithSegmentID(task.SegmentID())) + if len(segmentInTargetNode) == 0 { return merr.WrapErrServiceInternal("source segment released, stop balancing") } }