From c26c1b33c294fddd6bf6c52034c94ef0b0a2d4a0 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 19 Mar 2024 09:59:05 +0800 Subject: [PATCH] fix: Transfer l0 segment to new delegator after balance (#31319) issue: #30186 during channel balance, after new delegator loaded, instead of syncing l0 segment's location to new delegator, we should load l0 segment on new delegator, and release the old l0 segment, then start to release old delegator. --------- Signed-off-by: Wei Liu --- .../querycoordv2/checkers/leader_checker.go | 14 ++- .../checkers/leader_checker_test.go | 47 +++++++++ .../querycoordv2/checkers/segment_checker.go | 8 +- .../checkers/segment_checker_test.go | 71 ++++++++++++++ internal/querycoordv2/task/scheduler.go | 6 +- internal/querycoordv2/task/task_test.go | 95 +++++++++++++++++++ 6 files changed, 234 insertions(+), 7 deletions(-) diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 406b10d2ad..cff6853523 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -121,8 +122,11 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int6 ret := make([]task.Task, 0) dist = utils.FindMaxVersionSegments(dist) for _, s := range dist { - existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) != nil - if !existInTarget { + segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) + existInTarget := segment != nil + isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0 + // should set l0 segment location to delegator. l0 segment should be reload in delegator + if !existInTarget || isL0Segment { continue } @@ -166,8 +170,10 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int for sid, s := range leaderView.Segments { _, ok := distMap[sid] - existInTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst) != nil - if ok || existInTarget { + segment := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTargetFirst) + existInTarget := segment != nil + isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0 + if ok || existInTarget || isL0Segment { continue } log.Debug("leader checker append a segment to remove", diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index cfdd27865b..62824a5be1 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -123,6 +123,29 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion) suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) + + // test skip sync l0 segment + segments = []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + Level: datapb.SegmentLevel_L0, + }, + } + suite.broker.ExpectedCalls = nil + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + // mock l0 segment exist on non delegator node, doesn't set to leader view + observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, loadVersion, "test-insert-channel")) + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + tasks = suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) } func (suite *LeaderCheckerTestSuite) TestActivation() { @@ -324,6 +347,30 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0)) suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) + + // skip sync l0 segments + segments := []*datapb.SegmentInfo{ + { + ID: 3, + PartitionID: 1, + InsertChannel: "test-insert-channel", + Level: datapb.SegmentLevel_L0, + }, + } + suite.broker.ExpectedCalls = nil + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + + tasks = suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) } func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 3b980cc806..e5ff8ab432 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -236,7 +236,13 @@ func (c *SegmentChecker) getSealedSegmentDiff( _, existOnCurrent := currentTargetMap[segment.GetID()] _, existOnNext := nextTargetMap[segment.GetID()] - if !existOnNext && !existOnCurrent { + l0WithWrongLocation := false + if existOnCurrent { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, segment.GetInsertChannel()) + l0WithWrongLocation = segment.GetLevel() == datapb.SegmentLevel_L0 && segment.Node != leader.ID + } + + if !existOnNext && !existOnCurrent || l0WithWrongLocation { toRelease = append(toRelease, segment) } } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 28582d6312..998a567bf2 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -168,6 +168,77 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Len(tasks, 1) } +func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + Level: datapb.SegmentLevel_L0, + }, + } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + // set dist + checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + // seg l0 segment exist on a non delegator node + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel")) + checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) + + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + action, ok := tasks[0].Actions()[0].(*task.SegmentAction) + suite.True(ok) + suite.EqualValues(1, tasks[0].ReplicaID()) + suite.Equal(task.ActionTypeGrow, action.Type()) + suite.EqualValues(1, action.SegmentID()) + suite.EqualValues(2, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) + + // release duplicate l0 segment + checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 1, 2, 100, "test-insert-channel")) + tasks = checker.Check(context.TODO()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + action, ok = tasks[0].Actions()[0].(*task.SegmentAction) + suite.True(ok) + suite.EqualValues(1, tasks[0].ReplicaID()) + suite.Equal(task.ActionTypeReduce, action.Type()) + suite.EqualValues(1, action.SegmentID()) + suite.EqualValues(1, action.Node()) + suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) +} + func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { checker := suite.checker // set meta diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 7a8504fd35..541346b613 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -652,8 +652,10 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { return false } - for segmentID := range segmentsInTarget { - if _, exist := leader.Segments[segmentID]; !exist { + for segmentID, s := range segmentsInTarget { + _, exist := leader.Segments[segmentID] + l0WithWrongLocation := exist && s.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID + if !exist || l0WithWrongLocation { return false } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 2c3a9c56e4..7663bb9d1f 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1697,6 +1697,101 @@ func (suite *TaskSuite) TestBalanceChannelTask() { suite.Equal(2, task.step) } +func (suite *TaskSuite) TestBalanceChannelWithL0SegmentTask() { + collectionID := int64(1) + partitionID := int64(1) + channel := "channel-1" + vchannel := &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channel, + } + + segments := []*datapb.SegmentInfo{ + { + ID: 1, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 2, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + Level: datapb.SegmentLevel_L0, + }, + { + ID: 3, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + Level: datapb.SegmentLevel_L0, + }, + } + suite.meta.PutCollection(utils.CreateTestCollection(collectionID, 1), utils.CreateTestPartition(collectionID, 1)) + suite.broker.ExpectedCalls = nil + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return([]*datapb.VchannelInfo{vchannel}, segments, nil) + suite.target.UpdateCollectionNextTarget(collectionID) + suite.target.UpdateCollectionCurrentTarget(collectionID) + suite.target.UpdateCollectionNextTarget(collectionID) + + suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ + ID: 2, + CollectionID: collectionID, + Channel: channel, + Segments: map[int64]*querypb.SegmentDist{ + 1: {NodeID: 2}, + 2: {NodeID: 2}, + 3: {NodeID: 2}, + }, + }) + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: channel, + Segments: map[int64]*querypb.SegmentDist{ + 1: {NodeID: 2}, + 2: {NodeID: 2}, + 3: {NodeID: 2}, + }, + }) + + task, err := NewChannelTask(context.Background(), + 10*time.Second, + WrapIDSource(2), + collectionID, + 1, + NewChannelAction(1, ActionTypeGrow, channel), + NewChannelAction(2, ActionTypeReduce, channel), + ) + suite.NoError(err) + + // l0 hasn't been loaded into delegator, block balance + suite.scheduler.preProcess(task) + suite.Equal(0, task.step) + + suite.dist.LeaderViewManager.Update(1, &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: channel, + Segments: map[int64]*querypb.SegmentDist{ + 1: {NodeID: 1}, + 2: {NodeID: 1}, + 3: {NodeID: 1}, + }, + }) + + // new delegator distribution updated, task step up + suite.scheduler.preProcess(task) + suite.Equal(1, task.step) + + suite.dist.LeaderViewManager.Update(2) + // old delegator removed + suite.scheduler.preProcess(task) + suite.Equal(2, task.step) +} + func TestTask(t *testing.T) { suite.Run(t, new(TaskSuite)) }