diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 2a052478fa..e87c2e83fe 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -881,9 +881,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade log := log.With(zap.String("channel", channel.GetChannelName())) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) - leaders = filterDupLeaders(s.meta.ReplicaManager, leaders) - ids := make([]int64, 0, len(leaders)) - addrs := make([]string, 0, len(leaders)) + + readableLeaders := make(map[int64]*meta.LeaderView) var channelErr error if len(leaders) == 0 { @@ -944,11 +943,10 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade continue } - ids = append(ids, info.ID()) - addrs = append(addrs, info.Addr()) + readableLeaders[leader.ID] = leader } - if len(ids) == 0 { + if len(readableLeaders) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) log.Warn(msg, zap.Error(channelErr)) resp.Status = merr.Status( @@ -957,6 +955,15 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade return resp, nil } + readableLeaders = filterDupLeaders(s.meta.ReplicaManager, readableLeaders) + ids := make([]int64, 0, len(leaders)) + addrs := make([]string, 0, len(leaders)) + for _, leader := range readableLeaders { + info := s.nodeMgr.Get(leader.ID) + ids = append(ids, info.ID()) + addrs = append(addrs, info.Addr()) + } + resp.Shards = append(resp.Shards, &querypb.ShardLeadersList{ ChannelName: channel.GetChannelName(), NodeIds: ids,