From 1748c54fd7a293b228e498cab0a4bc394c161ecf Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 24 Jul 2023 19:01:01 +0800 Subject: [PATCH] skip load/release segment when more than one delegator exist (#25718) Signed-off-by: Wei Liu --- .../querycoordv2/checkers/segment_checker.go | 16 +++++++- .../checkers/segment_checker_test.go | 31 ++++++++++++++ .../querycoordv2/meta/channel_dist_manager.go | 21 ++++++++++ .../meta/channel_dist_manager_test.go | 41 +++++++++++++++++++ 4 files changed, 108 insertions(+), 1 deletion(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 77aa1c38dc..38d3564ff6 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -83,8 +83,22 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { } func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task { + log := log.Ctx(ctx).WithRateGroup("qcv2.SegmentChecker", 1, 60).With( + zap.Int64("collectionID", replica.CollectionID), + zap.Int64("replicaID", replica.ID)) ret := make([]task.Task, 0) + // get channel dist by replica (ch -> node list), cause more then one delegator may exists during channel balance. + // if more than one delegator exist, load/release segment may causes chaos, so we can skip it until channel balance finished. + dist := c.dist.ChannelDistManager.GetChannelDistByReplica(replica) + for ch, nodes := range dist { + if len(nodes) > 1 { + log.Info("skip check segment due to two shard leader exists", + zap.String("channelName", ch)) + return ret + } + } + // compare with targets to find the lack and redundancy of segments lacks, redundancies := c.getHistoricalSegmentDiff(c.targetMgr, c.dist, c.meta, replica.GetCollectionID(), replica.GetID()) tasks := c.createSegmentLoadTasks(ctx, lacks, replica) @@ -124,7 +138,7 @@ func (c *SegmentChecker) getStreamingSegmentDiff(targetMgr *meta.TargetManager, return } - log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 60, 1).With( + log := log.Ctx(context.TODO()).WithRateGroup("qcv2.SegmentChecker", 1, 60).With( zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replica.ID)) diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index 8b67825d12..29a53f35db 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -136,7 +136,38 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { suite.Equal(task.ActionTypeGrow, action.Type()) suite.EqualValues(1, action.SegmentID()) suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal) +} +func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { + checker := suite.checker + // set meta + checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + nil, segments, nil) + checker.targetMgr.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) + + // set dist + checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel")) + checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel")) + checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 11, 1, 1, "test-insert-channel")) + checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) + + tasks := checker.Check(context.TODO()) + suite.Len(tasks, 0) } func (suite *SegmentCheckerTestSuite) TestReleaseSegments() { diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 72525ed72d..c46041fc2a 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -119,6 +119,27 @@ func (m *ChannelDistManager) GetShardLeadersByReplica(replica *Replica) map[stri return ret } +func (m *ChannelDistManager) GetChannelDistByReplica(replica *Replica) map[string][]int64 { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + ret := make(map[string][]int64) + for _, node := range replica.GetNodes() { + channels := m.channels[node] + for _, dmc := range channels { + if dmc.GetCollectionID() == replica.GetCollectionID() { + channelName := dmc.GetChannelName() + _, ok := ret[channelName] + if !ok { + ret[channelName] = make([]int64, 0) + } + ret[channelName] = append(ret[channelName], node) + } + } + } + return ret +} + func (m *ChannelDistManager) GetByCollection(collectionID UniqueID) []*DmChannel { m.rwmutex.RLock() defer m.rwmutex.RUnlock() diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index 3d322cae89..fbd4afe2c3 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -148,6 +148,47 @@ func (suite *ChannelDistManagerSuite) TestGetShardLeader() { suite.Equal(leaders["dmc1"], suite.nodes[1]) } +func (suite *ChannelDistManagerSuite) TestGetChannelDistByReplica() { + replica := NewReplica( + &querypb.Replica{ + CollectionID: suite.collection, + }, + typeutil.NewUniqueSet(11, 22, 33), + ) + + ch1 := &DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: "test-channel1", + }, + Node: 11, + Version: 1, + } + ch2 := &DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: "test-channel1", + }, + Node: 22, + Version: 1, + } + ch3 := &DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: "test-channel2", + }, + Node: 33, + Version: 1, + } + suite.dist.Update(11, ch1) + suite.dist.Update(22, ch2) + suite.dist.Update(33, ch3) + + dist := suite.dist.GetChannelDistByReplica(replica) + suite.Len(dist["test-channel1"], 2) + suite.Len(dist["test-channel2"], 1) +} + func (suite *ChannelDistManagerSuite) AssertNames(channels []*DmChannel, names ...string) bool { for _, channel := range channels { hasChannel := false