From 565fc3a0196fd868632098cfda9976dd7f2180ee Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 12 Jan 2024 18:56:58 +0800 Subject: [PATCH] enhance: Skip generate load segment task (#29724) issue: #29814 if channel is not subscribed yet, the generated load segment task will be remove from task scheduler due to the load segment task need to be transfer to worker node by shard leader. This PR skip generate load segment task when channel is not subscribed yet. Signed-off-by: Wei Liu --- .../querycoordv2/checkers/controller_test.go | 15 ++++- .../querycoordv2/checkers/segment_checker.go | 55 ++++++++++--------- .../checkers/segment_checker_test.go | 36 ++++++++++++ 3 files changed, 79 insertions(+), 27 deletions(-) diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 9126a30f47..10bea36a75 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -137,10 +137,21 @@ func (suite *CheckerControllerSuite) TestBasic() { suite.controller.Start() defer suite.controller.Stop() + // expect assign channel first suite.Eventually(func() bool { suite.controller.Check() - return counter.Load() > 0 && assignSegCounter.Load() > 0 && assingChanCounter.Load() > 0 - }, 5*time.Second, 1*time.Millisecond) + return counter.Load() > 0 && assingChanCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) + + // until new channel has been subscribed + suite.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2")) + suite.dist.LeaderViewManager.Update(1, utils.CreateTestLeaderView(1, 1, "test-insert-channel2", map[int64]int64{}, map[int64]*meta.Segment{})) + + // expect assign segment after channel has been subscribed + suite.Eventually(func() bool { + suite.controller.Check() + return counter.Load() > 0 && assignSegCounter.Load() > 0 + }, 3*time.Second, 1*time.Millisecond) } func TestCheckControllerSuite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index cc13e0dd99..25337427f6 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -344,39 +344,44 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] return nil } - isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 - - shardSegments := make(map[string][]*meta.Segment) - for _, s := range segments { - if isLevel0 && - len(c.dist.LeaderViewManager.GetLeadersByShard(s.GetInsertChannel())) == 0 { - continue + // filter out stopping nodes and outbound nodes + outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) + availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { + stop, err := c.nodeMgr.IsStoppingNode(node) + if err != nil { + return false } - channel := s.GetInsertChannel() - packedSegments := shardSegments[channel] - packedSegments = append(packedSegments, &meta.Segment{ - SegmentInfo: s, - }) - shardSegments[channel] = packedSegments + return !outboundNodes.Contain(node) && !stop + }) + + if len(availableNodes) == 0 { + return nil } + isLevel0 := segments[0].GetLevel() == datapb.SegmentLevel_L0 + shardSegments := lo.GroupBy(segments, func(s *datapb.SegmentInfo) string { + return s.GetInsertChannel() + }) + plans := make([]balance.SegmentAssignPlan, 0) for shard, segments := range shardSegments { - outboundNodes := c.meta.ResourceManager.CheckOutboundNodes(replica) - availableNodes := lo.Filter(replica.Replica.GetNodes(), func(node int64, _ int) bool { - stop, err := c.nodeMgr.IsStoppingNode(node) - if err != nil { - return false - } + // if channel is not subscribed yet, skip load segments + if len(c.dist.LeaderViewManager.GetLeadersByShard(shard)) == 0 { + continue + } - if isLevel0 { - leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) - return !outboundNodes.Contain(node) && !stop && node == leader.ID + // L0 segment can only be assign to shard leader's node + if isLevel0 { + leader := c.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, shard) + availableNodes = []int64{leader.ID} + } + + segmentInfos := lo.Map(segments, func(s *datapb.SegmentInfo, _ int) *meta.Segment { + return &meta.Segment{ + SegmentInfo: s, } - return !outboundNodes.Contain(node) && !stop }) - - shardPlans := c.balancer.AssignSegment(replica.CollectionID, segments, availableNodes) + shardPlans := c.balancer.AssignSegment(replica.CollectionID, segmentInfos, availableNodes) for i := range shardPlans { shardPlans[i].ReplicaID = replica.GetID() } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 9048284e46..1d2dcb3d02 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -160,6 +160,42 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Len(tasks, 1) } +func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, segments, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + + // when channel not subscribed, segment_checker won't generate load segment task + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 0) +} + func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { checker := suite.checker // set meta