diff --git a/internal/querycoordv2/errors.go b/internal/querycoordv2/errors.go index ea34520892..6dc621b69b 100644 --- a/internal/querycoordv2/errors.go +++ b/internal/querycoordv2/errors.go @@ -16,8 +16,23 @@ package querycoordv2 -import "errors" +import ( + "errors" + "fmt" +) var ( ErrNotHealthy = errors.New("NotHealthy") + + // Node Availability + ErrLackSegment = errors.New("LackSegment") + ErrNodeOffline = errors.New("NodeOffline") ) + +func WrapErrLackSegment(segmentID int64) error { + return fmt.Errorf("%w(segmentID=%v)", ErrLackSegment, segmentID) +} + +func WrapErrNodeOffline(nodeID int64) error { + return fmt.Errorf("%w(nodeID=%v)", ErrNodeOffline, nodeID) +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 1e84057d85..e85d6e6044 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/samber/lo" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -668,35 +669,70 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade return resp, nil } + currentTargets := s.targetMgr.GetSegmentsByCollection(req.GetCollectionID()) for _, channel := range channels { log := log.With(zap.String("channel", channel.GetChannelName())) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) ids := make([]int64, 0, len(leaders)) addrs := make([]string, 0, len(leaders)) + + var channelErr error + + // In a replica, a shard is available, if and only if: + // 1. The leader is online + // 2. All QueryNodes in the distribution are online + // 3. All segments of the shard in target should be in the distribution for _, leader := range leaders { + log := log.With(zap.Int64("leaderID", leader.ID)) info := s.nodeMgr.Get(leader.ID) if info == nil { + log.Info("leader is not available", zap.Int64("leaderID", leader.ID)) + multierr.AppendInto(&channelErr, WrapErrNodeOffline(leader.ID)) continue } - isAllNodeAvailable := true + // Check whether QueryNodes are online and available + isAvailable := true for _, version := range leader.Segments { if s.nodeMgr.Get(version.NodeID) == nil { - isAllNodeAvailable = false + log.Info("leader is not available due to QueryNode not available", zap.Int64("nodeID", version.GetNodeID())) + isAvailable = false + multierr.AppendInto(&channelErr, WrapErrNodeOffline(version.GetNodeID())) break } } - if !isAllNodeAvailable { + + // Avoid iterating all segments if any QueryNode unavailable + if !isAvailable { continue } + + // Check whether segments are fully loaded + for _, segment := range currentTargets { + if segment.GetInsertChannel() != leader.Channel { + continue + } + + _, exist := leader.Segments[segment.GetID()] + if !exist { + log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segment.GetID())) + multierr.AppendInto(&channelErr, WrapErrLackSegment(segment.GetID())) + isAvailable = false + break + } + } + if !isAvailable { + continue + } + ids = append(ids, info.ID()) addrs = append(addrs, info.Addr()) } if len(ids) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) - log.Warn(msg) - resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg) + log.Warn(msg, zap.Error(channelErr)) + resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg, channelErr) resp.Shards = nil return resp, nil } @@ -707,6 +743,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade NodeAddrs: addrs, }) } + return resp, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 478048ce60..17ab87850c 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" @@ -901,6 +902,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() { req := &querypb.GetShardLeadersRequest{ CollectionID: collection, } + resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -920,6 +922,37 @@ func (suite *ServiceSuite) TestGetShardLeaders() { suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) } +func (suite *ServiceSuite) TestGetShardLeadersFailed() { + suite.loadAll() + ctx := context.Background() + server := suite.server + + for _, collection := range suite.collections { + suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) + suite.updateChannelDist(collection) + req := &querypb.GetShardLeadersRequest{ + CollectionID: collection, + } + + // Node offline + for _, node := range suite.nodes { + suite.nodeMgr.Remove(node) + } + resp, err := server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + for _, node := range suite.nodes { + suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) + } + + // Segment not fully loaded + suite.updateChannelDistWithoutSegment(collection) + resp, err = server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + } +} + func (suite *ServiceSuite) loadAll() { ctx := context.Background() for _, collection := range suite.collections { @@ -1073,6 +1106,38 @@ func (suite *ServiceSuite) updateSegmentDist(collection, node int64) { func (suite *ServiceSuite) updateChannelDist(collection int64) { channels := suite.channels[collection] + segments := lo.Flatten(lo.Values(suite.segments[collection])) + + replicas := suite.meta.ReplicaManager.GetByCollection(collection) + for _, replica := range replicas { + i := 0 + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: collection, + ChannelName: channels[i], + })) + suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ + ID: node, + CollectionID: collection, + Channel: channels[i], + Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) { + return segment, &querypb.SegmentDist{ + NodeID: node, + Version: time.Now().Unix(), + } + }), + }) + i++ + if i >= len(channels) { + break + } + } + } +} + +func (suite *ServiceSuite) updateChannelDistWithoutSegment(collection int64) { + channels := suite.channels[collection] + replicas := suite.meta.ReplicaManager.GetByCollection(collection) for _, replica := range replicas { i := 0