From 6653e2c3b0eaca9c3fe348aaea5be2b53a561fd3 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 25 Apr 2023 10:22:37 +0800 Subject: [PATCH] fix balance channel (#23631) Signed-off-by: Wei Liu --- .../balance/rowcount_based_balancer.go | 24 ++++++++++++------- .../balance/rowcount_based_balancer_test.go | 22 +++++++++++++++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index bbf23b2a9b..3bfb94482b 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -267,24 +267,30 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode } for start < end { - sourceNode := nodes[start] - targetNode := nodes[end] + // segment to move in + targetNode := nodes[start] + // segment to move out + sourceNode := nodes[end] + + if len(channelsOnNode[targetNode]) >= averageChannel { + break + } // remove channel from end node - selectChannel := channelsOnNode[targetNode][0] - channelsOnNode[targetNode] = channelsOnNode[targetNode][1:] + selectChannel := channelsOnNode[sourceNode][0] + channelsOnNode[sourceNode] = channelsOnNode[sourceNode][1:] // add channel to start node - if channelsOnNode[sourceNode] == nil { - channelsOnNode[sourceNode] = make([]*meta.DmChannel, 0) + if channelsOnNode[targetNode] == nil { + channelsOnNode[targetNode] = make([]*meta.DmChannel, 0) } - channelsOnNode[sourceNode] = append(channelsOnNode[sourceNode], selectChannel) + channelsOnNode[targetNode] = append(channelsOnNode[targetNode], selectChannel) // generate channel plan plan := ChannelAssignPlan{ Channel: selectChannel, - From: targetNode, - To: sourceNode, + From: sourceNode, + To: targetNode, ReplicaID: replica.ID, } channelPlans = append(channelPlans, plan) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 3cee1cf475..e4639ab2e8 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -272,6 +272,28 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1}, }, }, + { + name: "unbalance stable view", + nodes: []int64{1, 2, 3}, + segmentCnts: []int{0, 0, 0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + shouldMock: true, + distributions: map[int64][]*meta.Segment{}, + distributionChannels: map[int64][]*meta.DmChannel{ + 1: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 2}, + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, + }, + 2: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, + }, + 3: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v4"}, Node: 2}, + }, + }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{}, + }, { name: "already balanced", nodes: []int64{11, 22},