diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index f8c37054df..3cf01329c2 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -227,20 +227,3 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen } return ret } - -// return node list which contains the given segmentID -func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64 { - m.rwmutex.RLock() - defer m.rwmutex.RUnlock() - - ret := make([]int64, 0) - for nodeID, segments := range m.segments { - for _, segment := range segments.segments { - if segment.GetID() == segmentID { - ret = append(ret, nodeID) - break - } - } - } - return ret -} diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 5f9dc7250a..54823a988f 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -111,11 +111,20 @@ func (action *SegmentAction) Scope() querypb.DataScope { func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { if action.Type() == ActionTypeGrow { + // rpc finished + if !action.rpcReturned.Load() { + return false + } + + // segment found in leader view views := distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(action.segmentID, false)) - nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID()) - return len(views) > 0 && - lo.Contains(nodeSegmentDist, action.Node()) && - action.rpcReturned.Load() + if len(views) == 0 { + return false + } + + // segment found in dist + segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID())) + return len(segmentInTargetNode) > 0 } else if action.Type() == ActionTypeReduce { // FIXME: Now shard leader's segment view is a map of segment ID to node ID, // loading segment replaces the node ID with the new one, diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index fde3947cc6..6bec7532db 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -345,9 +345,11 @@ func (scheduler *taskScheduler) preAdd(task Task) error { if taskType == TaskTypeMove { views := scheduler.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(task.SegmentID(), false)) - nodeSegmentDist := scheduler.distMgr.SegmentDistManager.GetSegmentDist(task.SegmentID()) - if len(views) == 0 || - !lo.Contains(nodeSegmentDist, task.Actions()[1].Node()) { + if len(views) == 0 { + return merr.WrapErrServiceInternal("segment's delegator not found, stop balancing") + } + segmentInTargetNode := scheduler.distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(task.Actions()[1].Node()), meta.WithSegmentID(task.SegmentID())) + if len(segmentInTargetNode) == 0 { return merr.WrapErrServiceInternal("source segment released, stop balancing") } }