From 42e538b683b42e537e380efe94c88ddb7fa6c5fe Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 11 Dec 2023 14:18:37 +0800 Subject: [PATCH] enhance: enable balance channel in querycoord (#28469) issue: #23726 /kind improvement 1. enable auto balance channel between nodes in querycoord 2. make `genSegmentPlan` reuse the `AssignSegment` logic 3. make `genChannelPlan` reuse the `AssignChannel` logic --------- Signed-off-by: Wei Liu --- .../balance/rowcount_based_balancer.go | 305 ++++++++++-------- .../balance/rowcount_based_balancer_test.go | 101 +++--- .../balance/score_based_balancer.go | 4 +- .../balance/score_based_balancer_test.go | 2 +- 4 files changed, 223 insertions(+), 189 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 9d44cfa4bc..23c180621c 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -17,6 +17,8 @@ package balance import ( + "context" + "math" "sort" "github.com/samber/lo" @@ -35,8 +37,10 @@ type RowCountBasedBalancer struct { targetMgr *meta.TargetManager } +// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count. +// try to make every query node has same row count. func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64) []SegmentAssignPlan { - nodeItems := b.convertToNodeItems(nodes) + nodeItems := b.convertToNodeItemsBySegment(nodes) if len(nodeItems) == 0 { return nil } @@ -67,7 +71,37 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me return plans } -func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem { +// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count. +// try to make every query node has channel count +func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64) []ChannelAssignPlan { + nodeItems := b.convertToNodeItemsByChannel(nodes) + if len(nodeItems) == 0 { + return nil + } + queue := newPriorityQueue() + for _, item := range nodeItems { + queue.push(item) + } + + plans := make([]ChannelAssignPlan, 0, len(channels)) + for _, c := range channels { + // pick the node with the least channel num and allocate to it. + ni := queue.pop().(*nodeItem) + plan := ChannelAssignPlan{ + From: -1, + To: ni.nodeID, + Channel: c, + } + plans = append(plans, plan) + // change node's priority and push back + p := ni.getPriority() + ni.setPriority(p + 1) + queue.push(ni) + } + return plans +} + +func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*nodeItem { ret := make([]*nodeItem, 0, len(nodeIDs)) for _, nodeInfo := range b.getNodes(nodeIDs) { node := nodeInfo.ID() @@ -92,76 +126,128 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem return ret } +func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*nodeItem { + ret := make([]*nodeItem, 0, len(nodeIDs)) + for _, nodeInfo := range b.getNodes(nodeIDs) { + node := nodeInfo.ID() + channels := b.dist.ChannelDistManager.GetByNode(node) + + // more channel num, less priority + nodeItem := newNodeItem(len(channels), node) + ret = append(ret, &nodeItem) + } + return ret +} + func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { + log := log.Ctx(context.TODO()).WithRateGroup("qcv2.RowCountBasedBalancer", 1, 60).With( + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replicaID", replica.GetCollectionID()), + zap.String("resourceGroup", replica.Replica.GetResourceGroup()), + ) nodes := replica.GetNodes() if len(nodes) < 2 { return nil, nil } - onlineNodesSegments := make(map[int64][]*meta.Segment) - stoppingNodesSegments := make(map[int64][]*meta.Segment) outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) - totalCnt := 0 - for _, nid := range nodes { - segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid) - // Only balance segments in targets - segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool { - return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && - b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil - }) + onlineNodes := make([]int64, 0) + offlineNodes := make([]int64, 0) + for _, nid := range nodes { if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { - log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err)) + log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err)) continue } else if isStopping { - stoppingNodesSegments[nid] = segments + offlineNodes = append(offlineNodes, nid) } else if outboundNodes.Contain(nid) { // if node is stop or transfer to other rg - log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetCollectionID()), - zap.Int64("node", nid), - ) - stoppingNodesSegments[nid] = segments + log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid)) + offlineNodes = append(offlineNodes, nid) } else { - onlineNodesSegments[nid] = segments - } - - for _, s := range segments { - totalCnt += int(s.GetNumOfRows()) + onlineNodes = append(onlineNodes, nid) } } - if len(nodes) == len(stoppingNodesSegments) || len(onlineNodesSegments) == 0 { + if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 { // no available nodes to balance return nil, nil } - segmentsToMove := make([]*meta.Segment, 0) - for _, stopSegments := range stoppingNodesSegments { - segmentsToMove = append(segmentsToMove, stopSegments...) + if len(offlineNodes) > 0 { + log.Info("Balance for stopping nodes", + zap.Any("stoppingNodes", offlineNodes), + zap.Any("onlineNodes", onlineNodes), + ) + return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes) } - // find nodes with less row count than average - nodesWithLessRow := newPriorityQueue() - average := totalCnt / len(onlineNodesSegments) - for node, segments := range onlineNodesSegments { - sort.Slice(segments, func(i, j int) bool { - return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() - }) + return b.genSegmentPlan(replica, onlineNodes), b.genChannelPlan(replica, onlineNodes) +} +func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan { + segmentPlans := make([]SegmentAssignPlan, 0) + for _, nodeID := range offlineNodes { + dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) + segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { + return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil + }) + plans := b.AssignSegment(replica.CollectionID, segments, onlineNodes) + for i := range plans { + plans[i].From = nodeID + plans[i].ReplicaID = replica.ID + } + segmentPlans = append(segmentPlans, plans...) + } + return segmentPlans +} + +func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { + segmentsToMove := make([]*meta.Segment, 0) + + nodeRowCount := make(map[int64]int, 0) + segmentDist := make(map[int64][]*meta.Segment) + totalRowCount := 0 + for _, node := range onlineNodes { + dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node) + segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { + return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil + }) rowCount := 0 for _, s := range segments { rowCount += int(s.GetNumOfRows()) - if rowCount <= average { - continue - } - - segmentsToMove = append(segmentsToMove, s) } - if rowCount < average { - item := newNodeItem(rowCount, node) - nodesWithLessRow.push(&item) + totalRowCount += rowCount + segmentDist[node] = segments + nodeRowCount[node] = rowCount + } + + if totalRowCount == 0 { + return nil + } + + // find nodes with less row count than average + average := totalRowCount / len(onlineNodes) + nodesWithLessRow := make([]int64, 0) + for node, segments := range segmentDist { + sort.Slice(segments, func(i, j int) bool { + return segments[i].GetNumOfRows() < segments[j].GetNumOfRows() + }) + + leftRowCount := nodeRowCount[node] + if leftRowCount < average { + nodesWithLessRow = append(nodesWithLessRow, node) + continue + } + + for _, s := range segments { + leftRowCount -= int(s.GetNumOfRows()) + if leftRowCount < average { + break + } + segmentsToMove = append(segmentsToMove, s) } } @@ -170,49 +256,20 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment return len(b.dist.SegmentDistManager.Get(s.GetID())) == 1 }) - return b.genSegmentPlan(replica, nodesWithLessRow, segmentsToMove, average), b.genChannelPlan(replica, lo.Keys(onlineNodesSegments), lo.Keys(stoppingNodesSegments)) -} - -func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, nodesWithLessRowCount priorityQueue, segmentsToMove []*meta.Segment, average int) []SegmentAssignPlan { - if nodesWithLessRowCount.Len() == 0 || len(segmentsToMove) == 0 { + if len(nodesWithLessRow) == 0 || len(segmentsToMove) == 0 { return nil } - sort.Slice(segmentsToMove, func(i, j int) bool { - return segmentsToMove[i].GetNumOfRows() < segmentsToMove[j].GetNumOfRows() - }) - - // allocate segments to those nodes with row cnt less than average - plans := make([]SegmentAssignPlan, 0) - for _, s := range segmentsToMove { - if nodesWithLessRowCount.Len() <= 0 { - break - } - - node := nodesWithLessRowCount.pop().(*nodeItem) - newPriority := node.getPriority() + int(s.GetNumOfRows()) - if newPriority > average { - nodesWithLessRowCount.push(node) - continue - } - - plan := SegmentAssignPlan{ - ReplicaID: replica.GetID(), - From: s.Node, - To: node.nodeID, - Segment: s, - } - plans = append(plans, plan) - node.setPriority(newPriority) - nodesWithLessRowCount.push(node) + segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, nodesWithLessRow) + for i := range segmentPlans { + segmentPlans[i].From = segmentPlans[i].Segment.Node + segmentPlans[i].ReplicaID = replica.ID } - return plans + + return segmentPlans } -func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan { - log.Info("balance channel", - zap.Int64s("online nodes", onlineNodes), - zap.Int64s("offline nodes", offlineNodes)) +func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan { channelPlans := make([]ChannelAssignPlan, 0) for _, nodeID := range offlineNodes { dmChannels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) @@ -223,67 +280,45 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode } channelPlans = append(channelPlans, plans...) } + return channelPlans +} - // if len(channelPlans) == 0 && len(onlineNodes) > 1 { - // // start to balance channels on all available nodes - // channels := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID) - // channelsOnNode := lo.GroupBy(channels, func(channel *meta.DmChannel) int64 { return channel.Node }) +func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNodes []int64) []ChannelAssignPlan { + channelPlans := make([]ChannelAssignPlan, 0) + if len(onlineNodes) > 1 { + // start to balance channels on all available nodes + channelDist := b.dist.ChannelDistManager.GetByCollection(replica.CollectionID) + if len(channelDist) == 0 { + return nil + } + average := int(math.Ceil(float64(len(channelDist)) / float64(len(onlineNodes)))) - // nodes := replica.GetNodes() - // getChannelNum := func(node int64) int { - // if channelsOnNode[node] == nil { - // return 0 - // } - // return len(channelsOnNode[node]) - // } - // sort.Slice(nodes, func(i, j int) bool { return getChannelNum(nodes[i]) < getChannelNum(nodes[j]) }) + // find nodes with less channel count than average + nodeWithLessChannel := make([]int64, 0) + channelsToMove := make([]*meta.DmChannel, 0) + for _, node := range onlineNodes { + channels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node) - // start := int64(0) - // end := int64(len(nodes) - 1) + if len(channels) <= average { + nodeWithLessChannel = append(nodeWithLessChannel, node) + continue + } - // averageChannel := int(math.Ceil(float64(len(channels)) / float64(len(onlineNodes)))) - // if averageChannel == 0 || getChannelNum(nodes[start]) >= getChannelNum(nodes[end]) { - // return channelPlans - // } + channelsToMove = append(channelsToMove, channels[average:]...) + } - // for start < end { - // // segment to move in - // targetNode := nodes[start] - // // segment to move out - // sourceNode := nodes[end] + if len(nodeWithLessChannel) == 0 || len(channelsToMove) == 0 { + return nil + } - // if len(channelsOnNode[sourceNode])-1 < averageChannel { - // break - // } + channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel) + for i := range channelPlans { + channelPlans[i].From = channelPlans[i].Channel.Node + channelPlans[i].ReplicaID = replica.ID + } - // // remove channel from end node - // selectChannel := channelsOnNode[sourceNode][0] - // channelsOnNode[sourceNode] = channelsOnNode[sourceNode][1:] - - // // add channel to start node - // if channelsOnNode[targetNode] == nil { - // channelsOnNode[targetNode] = make([]*meta.DmChannel, 0) - // } - // channelsOnNode[targetNode] = append(channelsOnNode[targetNode], selectChannel) - - // // generate channel plan - // plan := ChannelAssignPlan{ - // Channel: selectChannel, - // From: sourceNode, - // To: targetNode, - // ReplicaID: replica.ID, - // } - // channelPlans = append(channelPlans, plan) - // for end > 0 && getChannelNum(nodes[end]) <= averageChannel { - // end-- - // } - - // for start < end && getChannelNum(nodes[start]) >= averageChannel { - // start++ - // } - // } - - // } + return channelPlans + } return channelPlans } diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index a5795fd486..b308358838 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -248,34 +248,25 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, }, }, - // { - // name: "balance channel", - // nodes: []int64{2, 3}, - // segmentCnts: []int{2, 2}, - // states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, - // shouldMock: true, - // distributions: map[int64][]*meta.Segment{ - // 2: { - // {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, - // {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, - // }, - // 3: { - // {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, - // {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, - // }, - // }, - // distributionChannels: map[int64][]*meta.DmChannel{ - // 2: { - // {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, - // {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, - // }, - // 3: {}, - // }, - // expectPlans: []SegmentAssignPlan{}, - // expectChannelPlans: []ChannelAssignPlan{ - // {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, From: 2, To: 3, ReplicaID: 1}, - // }, - // }, + { + name: "balance channel", + nodes: []int64{2, 3}, + segmentCnts: []int{2, 2}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, + shouldMock: true, + distributions: map[int64][]*meta.Segment{}, + distributionChannels: map[int64][]*meta.DmChannel{ + 2: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, + }, + 3: {}, + }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{ + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 2}, From: 2, To: 3, ReplicaID: 1}, + }, + }, { name: "unbalance stable view", nodes: []int64{1, 2, 3}, @@ -298,26 +289,26 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{}, }, - // { - // name: "balance unstable view", - // nodes: []int64{1, 2, 3}, - // segmentCnts: []int{0, 0, 0}, - // states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, - // shouldMock: true, - // distributions: map[int64][]*meta.Segment{}, - // distributionChannels: map[int64][]*meta.DmChannel{ - // 1: { - // {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1}, - // {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, - // }, - // 2: {}, - // 3: {}, - // }, - // expectPlans: []SegmentAssignPlan{}, - // expectChannelPlans: []ChannelAssignPlan{ - // {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1}, From: 1, To: 2, ReplicaID: 1}, - // }, - // }, + { + name: "balance unstable view", + nodes: []int64{1, 2, 3}, + segmentCnts: []int{0, 0, 0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + shouldMock: true, + distributions: map[int64][]*meta.Segment{}, + distributionChannels: map[int64][]*meta.DmChannel{ + 1: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v1"}, Node: 1}, + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, + }, + 2: {}, + 3: {}, + }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{ + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 1}, From: 1, To: 2, ReplicaID: 1}, + }, + }, { name: "already balanced", nodes: []int64{11, 22}, @@ -377,7 +368,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) balancer.targetMgr.UpdateCollectionCurrentTarget(1) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0) + suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -396,6 +387,14 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) + + // clear distribution + for node := range c.distributions { + balancer.dist.SegmentDistManager.Update(node) + } + for node := range c.distributionChannels { + balancer.dist.ChannelDistManager.Update(node) + } }) } } @@ -576,7 +575,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { suite.broker.ExpectedCalls = nil suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0) + suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -646,7 +645,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { }, } - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0) + suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for _, c := range cases { suite.Run(c.name, func() { suite.SetupSuite() diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index b0c55dd988..7351e8ae27 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -189,11 +189,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...) - channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) + channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) } else { // normal balance, find segments from largest score nodes and transfer to smallest score nodes. segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...) - channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), nil)...) + channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...) } if len(segmentPlans) != 0 || len(channelPlans) != 0 { PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager) diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index db2135b649..38efd181cf 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -586,7 +586,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { for i, c := range cases { suite.Run(c.name, func() { if i == 0 { - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0) + suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() } suite.SetupSuite() defer suite.TearDownTest()