diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 324525a6fc..f3e767f76b 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -171,7 +171,6 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int } dist := c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithReplica2Channel(replica)) - targets := c.targetMgr.GetSealedSegmentsByCollection(replica.GetCollectionID(), meta.CurrentTarget) versionsMap := make(map[string]*meta.DmChannel) for _, ch := range dist { leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName()) @@ -184,13 +183,13 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int continue } - if err := utils.CheckLeaderAvailable(c.nodeMgr, leaderView, targets); err != nil { + if leaderView.UnServiceableError != nil { log.RatedInfo(10, "replica has unavailable shard leader", zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replicaID), zap.Int64("leaderID", ch.Node), zap.String("channel", ch.GetChannelName()), - zap.Error(err)) + zap.Error(leaderView.UnServiceableError)) continue } diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index e737aa4bdc..c06ed5328f 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -32,6 +32,7 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" @@ -220,6 +221,11 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons NumOfGrowingRows: lview.GetNumOfGrowingRows(), PartitionStatsVersions: lview.PartitionStatsVersions, } + // check leader serviceable + // todo by weiliu1031: serviceable status should be maintained by delegator, to avoid heavy check here + if err := utils.CheckLeaderAvailable(dh.nodeManager, dh.target, view); err != nil { + view.UnServiceableError = err + } updates = append(updates, view) } diff --git a/internal/querycoordv2/dist/dist_handler_test.go b/internal/querycoordv2/dist/dist_handler_test.go index c4cb4ec889..945b308ed6 100644 --- a/internal/querycoordv2/dist/dist_handler_test.go +++ b/internal/querycoordv2/dist/dist_handler_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -74,6 +75,8 @@ func (suite *DistHandlerSuite) TestBasic() { suite.dispatchMockCall.Unset() suite.dispatchMockCall = nil } + + suite.target.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{}) suite.dispatchMockCall = suite.scheduler.EXPECT().Dispatch(mock.Anything).Maybe() suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: 1, @@ -120,6 +123,7 @@ func (suite *DistHandlerSuite) TestGetDistributionFailed() { suite.dispatchMockCall.Unset() suite.dispatchMockCall = nil } + suite.target.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{}).Maybe() suite.dispatchMockCall = suite.scheduler.EXPECT().Dispatch(mock.Anything).Maybe() suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: 1, @@ -140,6 +144,8 @@ func (suite *DistHandlerSuite) TestForcePullDist() { suite.dispatchMockCall = nil } + suite.target.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{}).Maybe() + suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: 1, Address: "localhost", diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 022933c3bd..963e115b69 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -119,6 +119,7 @@ type LeaderView struct { TargetVersion int64 NumOfGrowingRows int64 PartitionStatsVersions map[int64]int64 + UnServiceableError error } func (view *LeaderView) Clone() *LeaderView { @@ -231,6 +232,9 @@ func (mgr *LeaderViewManager) Update(leaderID int64, views ...*LeaderView) { mgr.views[leaderID] = composeNodeViews(views...) // compute leader location change, find it's correspond collection + // 1. leader has been released from node + // 2. leader has been loaded to node + // 3. leader serviceable status changed if mgr.notifyFunc != nil { viewChanges := typeutil.NewUniqueSet() for channel, oldView := range oldViews { @@ -240,9 +244,17 @@ func (mgr *LeaderViewManager) Update(leaderID int64, views ...*LeaderView) { } } + serviceableChange := func(old, new *LeaderView) bool { + if old == nil || new == nil { + return true + } + + return (old.UnServiceableError == nil) != (new.UnServiceableError == nil) + } + for channel, newView := range newViews { // if channel loaded to current node - if _, ok := oldViews[channel]; !ok { + if oldView, ok := oldViews[channel]; !ok || serviceableChange(oldView, newView) { viewChanges.Insert(newView.CollectionID) } } diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index 892c80c599..b25e245e20 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -19,10 +19,12 @@ package meta import ( "testing" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type LeaderViewManagerSuite struct { @@ -248,16 +250,68 @@ func (suite *LeaderViewManagerSuite) TestNotifyDelegatorChanges() { }, } - updateCollections := make([]int64, 0) + retSet := typeutil.NewUniqueSet() mgr.SetNotifyFunc(func(collectionIDs ...int64) { - updateCollections = append(updateCollections, collectionIDs...) + retSet.Insert(collectionIDs...) }) mgr.Update(1, newViews...) + suite.Equal(2, retSet.Len()) + suite.True(retSet.Contain(100)) + suite.True(retSet.Contain(103)) - suite.Equal(2, len(updateCollections)) - suite.Contains(updateCollections, int64(100)) - suite.Contains(updateCollections, int64(103)) + newViews1 := []*LeaderView{ + { + ID: 1, + CollectionID: 101, + Channel: "test-channel-2", + UnServiceableError: errors.New("test error"), + }, + { + ID: 1, + CollectionID: 102, + Channel: "test-channel-3", + UnServiceableError: errors.New("test error"), + }, + { + ID: 1, + CollectionID: 103, + Channel: "test-channel-4", + UnServiceableError: errors.New("test error"), + }, + } + + retSet.Clear() + mgr.Update(1, newViews1...) + suite.Equal(3, len(retSet)) + suite.True(retSet.Contain(101)) + suite.True(retSet.Contain(102)) + suite.True(retSet.Contain(103)) + + newViews2 := []*LeaderView{ + { + ID: 1, + CollectionID: 101, + Channel: "test-channel-2", + UnServiceableError: errors.New("test error"), + }, + { + ID: 1, + CollectionID: 102, + Channel: "test-channel-3", + }, + { + ID: 1, + CollectionID: 103, + Channel: "test-channel-4", + }, + } + + retSet.Clear() + mgr.Update(1, newViews2...) + suite.Equal(2, len(retSet)) + suite.True(retSet.Contain(102)) + suite.True(retSet.Contain(103)) } func TestLeaderViewManager(t *testing.T) { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 8e4fca3fea..a8b7097367 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -2035,9 +2035,10 @@ func (suite *ServiceSuite) updateChannelDistWithoutSegment(collection int64) { ChannelName: channels[i], })) suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ - ID: node, - CollectionID: collection, - Channel: channels[i], + ID: node, + CollectionID: collection, + Channel: channels[i], + UnServiceableError: merr.ErrSegmentLack, }) i++ if i >= len(channels) { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index a2be33ae8e..4d5acd970e 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -699,25 +699,6 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { return false } - // check if new delegator is ready to release old delegator - checkLeaderView := func(collectionID int64, channel string, node int64) bool { - segmentsInTarget := scheduler.targetMgr.GetSealedSegmentsByChannel(collectionID, channel, meta.CurrentTarget) - leader := scheduler.distMgr.LeaderViewManager.GetLeaderShardView(node, channel) - if leader == nil { - return false - } - - for segmentID, s := range segmentsInTarget { - _, exist := leader.Segments[segmentID] - l0WithWrongLocation := exist && s.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID - if !exist || l0WithWrongLocation { - return false - } - } - - return true - } - actions, step := task.Actions(), task.Step() for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) { if GetTaskType(task) == TaskTypeMove && actions[step].Type() == ActionTypeGrow { @@ -729,7 +710,8 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { // causes a few time to load delta log, if reduce the old delegator in advance, // new delegator can't service search and query, will got no available channel error channelAction := actions[step].(*ChannelAction) - ready = checkLeaderView(task.CollectionID(), channelAction.Shard(), channelAction.Node()) + leader := scheduler.distMgr.LeaderViewManager.GetLeaderShardView(channelAction.Node(), channelAction.Shard()) + ready = leader.UnServiceableError == nil default: ready = true } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 58b2f59fc2..8eb1692925 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1666,9 +1666,10 @@ func (suite *TaskSuite) TestBalanceChannelTask() { }, }) suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ - ID: 1, - CollectionID: collectionID, - Channel: channel, + ID: 1, + CollectionID: collectionID, + Channel: channel, + UnServiceableError: merr.ErrSegmentLack, }) task, err := NewChannelTask(context.Background(), 10*time.Second, @@ -1763,6 +1764,7 @@ func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { 2: {NodeID: 2}, 3: {NodeID: 2}, }, + UnServiceableError: merr.ErrSegmentLack, }) task, err := NewChannelTask(context.Background(), diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 4ef2f685d3..e22a04d7b0 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -45,7 +45,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { // 2. All QueryNodes in the distribution are online // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 4. All segments of the shard in target should be in the distribution -func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error { +func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). With(zap.Int64("leaderID", leader.ID)) @@ -68,18 +68,20 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, return err } } - + segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, meta.CurrentTarget) // Check whether segments are fully loaded - for segmentID, info := range currentTargets { - if info.GetInsertChannel() != leader.Channel { - continue - } - + for segmentID, info := range segmentDist { _, exist := leader.Segments[segmentID] if !exist { log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } + + l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID + if l0WithWrongLocation { + log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) + return merr.WrapErrSegmentLack(segmentID) + } } return nil } @@ -110,7 +112,6 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel, ) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) - currentTargets := targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget) for _, channel := range channels { log := log.With(zap.String("channel", channel.GetChannelName())) @@ -122,8 +123,8 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter readableLeaders := make(map[int64]*meta.LeaderView) for _, leader := range leaders { - if err := CheckLeaderAvailable(nodeMgr, leader, currentTargets); err != nil { - multierr.AppendInto(&channelErr, err) + if leader.UnServiceableError != nil { + multierr.AppendInto(&channelErr, leader.UnServiceableError) continue } readableLeaders[leader.ID] = leader @@ -149,6 +150,9 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter // to avoid node down during GetShardLeaders if len(ids) == 0 { + if channelErr == nil { + channelErr = merr.WrapErrChannelNotAvailable(channel.GetChannelName()) + } msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) log.Warn(msg, zap.Error(channelErr)) err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error()) diff --git a/internal/querycoordv2/utils/util_test.go b/internal/querycoordv2/utils/util_test.go index ec94388c26..9c414d352f 100644 --- a/internal/querycoordv2/utils/util_test.go +++ b/internal/querycoordv2/utils/util_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -56,13 +57,16 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliable() { Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, } - suite.setNodeAvailable(1, 2) - err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + mockTargetManager := meta.NewMockTargetManager(suite.T()) + mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ 2: { ID: 2, InsertChannel: "test", }, - }) + }).Maybe() + + suite.setNodeAvailable(1, 2) + err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) suite.NoError(err) } @@ -73,14 +77,16 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { Channel: "test", Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, } - // leader nodeID=1 not available - suite.setNodeAvailable(2) - err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + mockTargetManager := meta.NewMockTargetManager(suite.T()) + mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ 2: { ID: 2, InsertChannel: "test", }, - }) + }).Maybe() + // leader nodeID=1 not available + suite.setNodeAvailable(2) + err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) suite.Error(err) suite.nodeMgr = session.NewNodeManager() }) @@ -91,14 +97,17 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { Channel: "test", Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, } - // leader nodeID=2 not available - suite.setNodeAvailable(1) - err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + + mockTargetManager := meta.NewMockTargetManager(suite.T()) + mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ 2: { ID: 2, InsertChannel: "test", }, - }) + }).Maybe() + // leader nodeID=2 not available + suite.setNodeAvailable(1) + err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) suite.Error(err) suite.nodeMgr = session.NewNodeManager() }) @@ -109,14 +118,16 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() { Channel: "test", Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, } - suite.setNodeAvailable(1, 2) - err := CheckLeaderAvailable(suite.nodeMgr, leadview, map[int64]*datapb.SegmentInfo{ + mockTargetManager := meta.NewMockTargetManager(suite.T()) + mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ // target segmentID=1 not in leadView 1: { ID: 1, InsertChannel: "test", }, - }) + }).Maybe() + suite.setNodeAvailable(1, 2) + err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) suite.Error(err) suite.nodeMgr = session.NewNodeManager() })