diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index e2004657ca..3c7854d098 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -186,10 +186,19 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons } } + var version int64 + for _, channel := range resp.GetChannels() { + if channel.GetChannel() == lview.GetChannel() { + version = channel.GetVersion() + break + } + } + view := &meta.LeaderView{ ID: resp.GetNodeID(), CollectionID: lview.GetCollection(), Channel: lview.GetChannel(), + Version: version, Segments: lview.GetSegmentDist(), GrowingSegments: segments, } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index f63cc4af1e..5b2c657f74 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -349,3 +349,31 @@ func errCode(err error) commonpb.ErrorCode { } return commonpb.ErrorCode_UnexpectedError } + +func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { + type leaderID struct { + ReplicaID int64 + Shard string + } + + newLeaders := make(map[leaderID]*meta.LeaderView) + for _, view := range leaders { + replica := replicaManager.GetByCollectionAndNode(view.CollectionID, view.ID) + if replica == nil { + continue + } + + id := leaderID{replica.GetID(), view.Channel} + if old, ok := newLeaders[id]; ok && old.Version > view.Version { + continue + } + + newLeaders[id] = view + } + + result := make(map[int64]*meta.LeaderView) + for _, v := range newLeaders { + result[v.ID] = v + } + return result +} diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index a853610c84..5321800bc0 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -27,6 +27,7 @@ type LeaderView struct { ID int64 CollectionID int64 Channel string + Version int64 Segments map[int64]*querypb.SegmentDist GrowingSegments map[int64]*Segment } @@ -46,6 +47,7 @@ func (view *LeaderView) Clone() *LeaderView { ID: view.ID, CollectionID: view.CollectionID, Channel: view.Channel, + Version: view.Version, Segments: segments, GrowingSegments: growings, } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 04295ae8bf..a489e7ddaf 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -889,6 +889,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade log := log.With(zap.String("channel", channel.GetChannelName())) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) + leaders = filterDupLeaders(s.meta.ReplicaManager, leaders) ids := make([]int64, 0, len(leaders)) addrs := make([]string, 0, len(leaders))