diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index e6772ede82..4267462bc5 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -36,6 +36,8 @@ type TargetScope = int32 const ( CurrentTarget TargetScope = iota + 1 NextTarget + CurrentTargetFirst + NextTargetFirst ) type TargetManager struct { @@ -316,12 +318,24 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect return NewCollectionTarget(segments, channels) } -func (mgr *TargetManager) getTarget(scope TargetScope) *target { - if scope == CurrentTarget { - return mgr.current +func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget { + switch scope { + case CurrentTarget: + return mgr.current.collectionTargetMap[collectionID] + case NextTarget: + return mgr.next.collectionTargetMap[collectionID] + case CurrentTargetFirst: + if current := mgr.current.collectionTargetMap[collectionID]; current != nil { + return current + } + return mgr.next.collectionTargetMap[collectionID] + case NextTargetFirst: + if next := mgr.next.collectionTargetMap[collectionID]; next != nil { + return next + } + return mgr.current.collectionTargetMap[collectionID] } - - return mgr.next + return nil } func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64, @@ -330,8 +344,7 @@ func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -352,8 +365,7 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -375,8 +387,7 @@ func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -391,9 +402,7 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) - + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil } @@ -415,8 +424,7 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -436,8 +444,7 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -457,8 +464,7 @@ func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope Ta mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -470,8 +476,7 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -482,8 +487,7 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return nil @@ -494,8 +498,7 @@ func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope T func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - targetMap := mgr.getTarget(scope) - collectionTarget := targetMap.getCollectionTarget(collectionID) + collectionTarget := mgr.getCollectionTarget(scope, collectionID) if collectionTarget == nil { return 0 diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index cdc1a8c147..503dee4ec7 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -415,6 +415,135 @@ func (suite *TargetManagerSuite) TestGetSegmentByChannel() { suite.Len(suite.mgr.GetDroppedSegmentsByChannel(collectionID, "channel-1", NextTarget), 3) } +func (suite *TargetManagerSuite) TestGetTarget() { + type testCase struct { + tag string + mgr *TargetManager + scope TargetScope + expectTarget *CollectionTarget + } + + current := &CollectionTarget{} + next := &CollectionTarget{} + + bothMgr := &TargetManager{ + current: &target{ + collectionTargetMap: map[int64]*CollectionTarget{ + 1000: current, + }, + }, + next: &target{ + collectionTargetMap: map[int64]*CollectionTarget{ + 1000: current, + }, + }, + } + currentMgr := &TargetManager{ + current: &target{ + collectionTargetMap: map[int64]*CollectionTarget{ + 1000: current, + }, + }, + next: &target{}, + } + nextMgr := &TargetManager{ + next: &target{ + collectionTargetMap: map[int64]*CollectionTarget{ + 1000: current, + }, + }, + current: &target{}, + } + + cases := []testCase{ + { + tag: "both_scope_unknown", + mgr: bothMgr, + scope: -1, + expectTarget: nil, + }, + { + tag: "both_scope_current", + mgr: bothMgr, + scope: CurrentTarget, + expectTarget: current, + }, + { + tag: "both_scope_next", + mgr: bothMgr, + scope: NextTarget, + expectTarget: next, + }, + { + tag: "both_scope_current_first", + mgr: bothMgr, + scope: CurrentTargetFirst, + expectTarget: current, + }, + { + tag: "both_scope_next_first", + mgr: bothMgr, + scope: NextTargetFirst, + expectTarget: next, + }, + { + tag: "next_scope_current", + mgr: nextMgr, + scope: CurrentTarget, + expectTarget: nil, + }, + { + tag: "next_scope_next", + mgr: nextMgr, + scope: NextTarget, + expectTarget: next, + }, + { + tag: "next_scope_current_first", + mgr: nextMgr, + scope: CurrentTargetFirst, + expectTarget: next, + }, + { + tag: "next_scope_next_first", + mgr: nextMgr, + scope: NextTargetFirst, + expectTarget: next, + }, + { + tag: "current_scope_current", + mgr: currentMgr, + scope: CurrentTarget, + expectTarget: current, + }, + { + tag: "current_scope_next", + mgr: currentMgr, + scope: NextTarget, + expectTarget: nil, + }, + { + tag: "current_scope_current_first", + mgr: currentMgr, + scope: CurrentTargetFirst, + expectTarget: current, + }, + { + tag: "current_scope_next_first", + mgr: currentMgr, + scope: NextTargetFirst, + expectTarget: current, + }, + } + + for _, tc := range cases { + suite.Run(tc.tag, func() { + target := tc.mgr.getCollectionTarget(tc.scope, 1000) + suite.Equal(tc.expectTarget, target) + }) + } +} + func TestTargetManager(t *testing.T) { suite.Run(t, new(TargetManagerSuite)) } diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 713cfd0465..57d2d261be 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -152,26 +152,24 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis continue } - channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTarget) - if channel == nil { - channel = o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTarget) - } - loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil) + if channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTargetFirst); channel != nil { + loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil) - log.Debug("leader observer append a segment to set", - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channel", leaderView.Channel), - zap.Int64("leaderViewID", leaderView.ID), - zap.Int64("segmentID", s.GetID()), - zap.Int64("nodeID", s.Node)) - ret = append(ret, &querypb.SyncAction{ - Type: querypb.SyncType_Set, - PartitionID: s.GetPartitionID(), - SegmentID: s.GetID(), - NodeID: s.Node, - Version: s.Version, - Info: loadInfo, - }) + log.Debug("leader observer append a segment to set", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channel", leaderView.Channel), + zap.Int64("leaderViewID", leaderView.ID), + zap.Int64("segmentID", s.GetID()), + zap.Int64("nodeID", s.Node)) + ret = append(ret, &querypb.SyncAction{ + Type: querypb.SyncType_Set, + PartitionID: s.GetPartitionID(), + SegmentID: s.GetID(), + NodeID: s.Node, + Version: s.Version, + Info: loadInfo, + }) + } } } return ret diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 0c33fc4e5d..3788780a6d 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -176,12 +176,19 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { task.CollectionID(), partitions..., ) + + // get channel first, in case of target updated after segment info fetched + channel := ex.targetMgr.GetDmChannel(task.CollectionID(), task.shard, meta.NextTargetFirst) + if channel == nil { + return merr.WrapErrChannelNotAvailable(task.shard) + } resp, err := ex.broker.GetSegmentInfo(ctx, task.SegmentID()) if err != nil || len(resp.GetInfos()) == 0 { log.Warn("failed to get segment info from DataCoord", zap.Error(err)) return err } segment := resp.GetInfos()[0] + indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID()) if err != nil { if !errors.Is(err, merr.ErrIndexNotFound) { @@ -191,11 +198,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { indexes = nil } - channel := ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget) - if channel == nil { - channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.CurrentTarget) - } - loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes) // Get collection index info