diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 2fdf0a8023..83bc802718 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -46,7 +46,11 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) { mgr.rwmutex.Lock() defer mgr.rwmutex.Unlock() - log.Info("remove collection from targets") + mgr.removeCollection(collectionID) +} + +func (mgr *TargetManager) removeCollection(collectionID int64) { + log.Info("remove collection from targets", zap.Int64("collectionID", collectionID)) for _, segment := range mgr.segments { if segment.CollectionID == collectionID { mgr.removeSegment(segment.GetID()) @@ -83,7 +87,15 @@ func (mgr *TargetManager) RemoveSegment(segmentID int64) { func (mgr *TargetManager) removeSegment(segmentID int64) { delete(mgr.segments, segmentID) - log.Info("segment removed from targets", zap.Int64("segment", segmentID)) + log.Info("segment removed from targets", zap.Int64("segmentID", segmentID)) +} + +func (mgr *TargetManager) Replace(collectionID int64, channels []*DmChannel, segments []*datapb.SegmentInfo) { + mgr.rwmutex.Lock() + defer mgr.rwmutex.Unlock() + + mgr.addDmChannel(channels...) + mgr.addSegment(segments...) } // AddSegment adds segment into target set, @@ -147,6 +159,10 @@ func (mgr *TargetManager) AddDmChannel(channels ...*DmChannel) { mgr.rwmutex.Lock() defer mgr.rwmutex.Unlock() + mgr.addDmChannel(channels...) +} + +func (mgr *TargetManager) addDmChannel(channels ...*DmChannel) { for _, channel := range channels { ts := channel.GetSeekPosition().GetTimestamp() log.Info("add channel into targets", diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 4d3edf2fe0..8aeaabe4b0 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -180,7 +180,6 @@ func (ob *CollectionObserver) refreshTargets(updatedAt time.Time, collectionID i ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - ob.targetMgr.RemoveCollection(collectionID) ob.handoffOb.Unregister(ctx, collectionID) if len(partitions) == 0 { @@ -193,12 +192,12 @@ func (ob *CollectionObserver) refreshTargets(updatedAt time.Time, collectionID i } ob.handoffOb.Register(collectionID) - utils.RegisterTargets(ctx, - ob.targetMgr, - ob.broker, - collectionID, - partitions, - ) + channels, segments, err := utils.FetchTargets(ctx, ob.targetMgr, ob.broker, collectionID, partitions) + if err != nil { + log.Warn("failed to fetch targets from DataCoord, will refresh targets later", zap.Error(err)) + return false + } + ob.targetMgr.Replace(collectionID, channels, segments) ob.refreshed[collectionID] = updatedAt return true diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index bec66e8d92..d4909a59d9 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -131,8 +131,27 @@ func SpawnReplicas(replicaMgr *meta.ReplicaManager, nodeMgr *session.NodeManager func RegisterTargets(ctx context.Context, targetMgr *meta.TargetManager, broker meta.Broker, - collection int64, partitions []int64) error { - dmChannels := make(map[string][]*datapb.VchannelInfo) + collection int64, + partitions []int64, +) error { + channels, segments, err := FetchTargets(ctx, targetMgr, broker, collection, partitions) + if err != nil { + return err + } + + targetMgr.AddDmChannel(channels...) + targetMgr.AddSegment(segments...) + return nil +} + +func FetchTargets(ctx context.Context, + targetMgr *meta.TargetManager, + broker meta.Broker, + collection int64, + partitions []int64, +) ([]*meta.DmChannel, []*datapb.SegmentInfo, error) { + channels := make(map[string][]*datapb.VchannelInfo) + segments := make([]*datapb.SegmentInfo, 0) for _, partitionID := range partitions { log.Debug("get recovery info...", @@ -140,12 +159,12 @@ func RegisterTargets(ctx context.Context, zap.Int64("partitionID", partitionID)) vChannelInfos, binlogs, err := broker.GetRecoveryInfo(ctx, collection, partitionID) if err != nil { - return err + return nil, nil, err } // Register segments for _, segmentBinlogs := range binlogs { - targetMgr.AddSegment(SegmentBinlogs2SegmentInfo( + segments = append(segments, SegmentBinlogs2SegmentInfo( collection, partitionID, segmentBinlogs)) @@ -153,13 +172,14 @@ func RegisterTargets(ctx context.Context, for _, info := range vChannelInfos { channelName := info.GetChannelName() - dmChannels[channelName] = append(dmChannels[channelName], info) + channels[channelName] = append(channels[channelName], info) } } // Merge and register channels - for _, channels := range dmChannels { + dmChannels := make([]*meta.DmChannel, 0, len(channels)) + for _, channels := range channels { dmChannel := MergeDmChannelInfo(channels) - targetMgr.AddDmChannel(dmChannel) + dmChannels = append(dmChannels, dmChannel) } - return nil + return dmChannels, segments, nil }