diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f3553be954..0d6b9401c3 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -186,7 +186,7 @@ queryCoord: channelTaskTimeout: 60000 # 1 minute segmentTaskTimeout: 120000 # 2 minute distPullInterval: 500 - heartbeatAvailableInterval: 2500 # Only QueryNodes which fetched heartbeats within the duration are available + heartbeatAvailableInterval: 10000 # 10s, Only QueryNodes which fetched heartbeats within the duration are available loadTimeoutSeconds: 600 checkHandoffInterval: 5000 taskMergeCap: 16 diff --git a/internal/querycoordv2/errors.go b/internal/querycoordv2/errors.go index ea34520892..ec4af473e5 100644 --- a/internal/querycoordv2/errors.go +++ b/internal/querycoordv2/errors.go @@ -16,8 +16,33 @@ package querycoordv2 -import "errors" +import ( + "errors" + "fmt" + "time" +) var ( ErrNotHealthy = errors.New("NotHealthy") + + // Node Availability + ErrLackSegment = errors.New("LackSegment") + ErrNodeOffline = errors.New("NodeOffline") + ErrNodeHeartbeatOutdated = errors.New("NodeHeartbeatOutdated") ) + +func WrapErrLackSegment(segmentID int64) error { + return fmt.Errorf("%w(segmentID=%v)", ErrLackSegment, segmentID) +} + +func WrapErrNodeOffline(nodeID int64) error { + return fmt.Errorf("%w(nodeID=%v)", ErrNodeOffline, nodeID) +} + +func WrapErrNodeHeartbeatOutdated(nodeID int64, lastHeartbeat time.Time) error { + return fmt.Errorf("%w(nodeID=%v, lastHeartbeat=%v)", + ErrNodeHeartbeatOutdated, + nodeID, + lastHeartbeat, + ) +} diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index c4fb9c3d70..6d0ef1e749 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -353,3 +353,12 @@ func errCode(err error) commonpb.ErrorCode { } return commonpb.ErrorCode_UnexpectedError } + +func checkNodeAvailable(nodeID int64, info *session.NodeInfo) error { + if info == nil { + return WrapErrNodeOffline(nodeID) + } else if time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) { + return WrapErrNodeHeartbeatOutdated(nodeID, info.LastHeartbeat()) + } + return nil +} diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index ae7ed054d6..416dc90ae6 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "sync" - "time" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" @@ -38,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/samber/lo" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -658,29 +658,42 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade ids := make([]int64, 0, len(leaders)) addrs := make([]string, 0, len(leaders)) + var channelErr error + // In a replica, a shard is available, if and only if: // 1. The leader is online // 2. All QueryNodes in the distribution are online // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 4. All segments of the shard in target should be in the distribution for _, leader := range leaders { + log := log.With(zap.Int64("leaderID", leader.ID)) info := s.nodeMgr.Get(leader.ID) // Check whether leader is online - if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) { + err := checkNodeAvailable(leader.ID, info) + if err != nil { + log.Info("leader is not available", zap.Error(err)) + multierr.AppendInto(&channelErr, fmt.Errorf("leader not available: %w", err)) continue } - // Check whether QueryNodes are online and available isAvailable := true for _, version := range leader.Segments { - info := s.nodeMgr.Get(version.NodeID) - if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) { + info := s.nodeMgr.Get(version.GetNodeID()) + err = checkNodeAvailable(version.GetNodeID(), info) + if err != nil { + log.Info("leader is not available due to QueryNode unavailable", zap.Error(err)) isAvailable = false + multierr.AppendInto(&channelErr, err) break } } + // Avoid iterating all segments if any QueryNode unavailable + if !isAvailable { + continue + } + // Check whether segments are fully loaded for segmentID, info := range currentTargets { if info.GetInsertChannel() != leader.Channel { @@ -689,6 +702,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade _, exist := leader.Segments[segmentID] if !exist { + log.Info("leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + multierr.AppendInto(&channelErr, WrapErrLackSegment(segmentID)) isAvailable = false break } @@ -703,8 +718,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade if len(ids) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) - log.Warn(msg) - resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg) + log.Warn(msg, zap.Error(channelErr)) + resp.Status = utils.WrapStatus(commonpb.ErrorCode_NoReplicaAvailable, msg, channelErr) resp.Shards = nil return resp, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 6e3ba4c11d..6607dcde11 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -931,9 +931,21 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() { CollectionID: collection, } + // Node offline + suite.fetchHeartbeats(time.Now()) + for _, node := range suite.nodes { + suite.nodeMgr.Remove(node) + } + resp, err := server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + for _, node := range suite.nodes { + suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost")) + } + // Last heartbeat response time too old suite.fetchHeartbeats(time.Now().Add(-Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) - 1)) - resp, err := server.GetShardLeaders(ctx, req) + resp, err = server.GetShardLeaders(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index cb67ea3b57..01a3014617 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -896,7 +896,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.HeartbeatAvailableInterval = ParamItem{ Key: "queryCoord.heartbeatAvailableInterval", Version: "2.2.1", - DefaultValue: "2500", + DefaultValue: "10000", PanicIfEmpty: true, } p.HeartbeatAvailableInterval.Init(base.mgr)