diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index ba3a6ff4b8..c1f1f6f178 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -180,10 +180,21 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment zap.Any("stoppingNodes", offlineNodes), zap.Any("onlineNodes", onlineNodes), ) - return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes) + + channelPlans := b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes) + if len(channelPlans) == 0 { + return b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes), nil + } + return nil, channelPlans } - return b.genSegmentPlan(replica, onlineNodes), b.genChannelPlan(replica, onlineNodes) + // segment balance will count the growing row num in delegator, so it's better to balance channel first, + // to avoid balance segment again after balance channel + channelPlans := b.genChannelPlan(replica, onlineNodes) + if len(channelPlans) == 0 { + return b.genSegmentPlan(replica, onlineNodes), nil + } + return nil, channelPlans } func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 506713cd9d..92aa83acb2 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -217,7 +217,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { expectChannelPlans: []ChannelAssignPlan{}, }, { - name: "part stopping balance", + name: "part stopping balance channel", nodes: []int64{1, 2, 3}, segmentCnts: []int{1, 2, 2}, states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping}, @@ -241,13 +241,41 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, }, }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{ + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + }, + }, + { + name: "part stopping balance segment", + nodes: []int64{1, 2, 3}, + segmentCnts: []int{1, 2, 2}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateStopping}, + shouldMock: true, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, + }, + 3: { + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, + }, + }, + distributionChannels: map[int64][]*meta.DmChannel{ + 2: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, + }, + 1: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 1}, + }, + }, expectPlans: []SegmentAssignPlan{ {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, }, - expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - }, + expectChannelPlans: []ChannelAssignPlan{}, }, { name: "balance channel", @@ -488,17 +516,12 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { 2: { {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, }, - 3: { - {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, - }, }, expectPlans: []SegmentAssignPlan{ {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, }, - expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - }, + expectChannelPlans: []ChannelAssignPlan{}, }, { name: "not exist in next target", @@ -619,7 +642,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { expectChannelPlans []ChannelAssignPlan }{ { - name: "balance out bound nodes", + name: "balance channel with outbound nodes", nodes: []int64{1, 2, 3}, segmentCnts: []int{1, 2, 2}, states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, @@ -643,13 +666,41 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, }, }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{ + {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, + }, + }, + { + name: "balance segment with outbound node", + nodes: []int64{1, 2, 3}, + segmentCnts: []int{1, 2, 2}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + shouldMock: true, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, + }, + 3: { + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, + }, + }, + distributionChannels: map[int64][]*meta.DmChannel{ + 2: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v2"}, Node: 2}, + }, + 1: { + {VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 1}, + }, + }, expectPlans: []SegmentAssignPlan{ {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, }, - expectChannelPlans: []ChannelAssignPlan{ - {Channel: &meta.DmChannel{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "v3"}, Node: 3}, From: 3, To: 1, ReplicaID: 1}, - }, + expectChannelPlans: []ChannelAssignPlan{}, }, } @@ -714,6 +765,12 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) + + // clean up distribution for next test + for node := range c.distributions { + balancer.dist.SegmentDistManager.Update(node) + balancer.dist.ChannelDistManager.Update(node) + } }) } } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 7fd03fa422..9bfd497768 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -191,13 +191,18 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss zap.Any("available nodes", maps.Keys(nodesSegments)), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...) channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) + if len(channelPlans) == 0 { + segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...) + } } else { // normal balance, find segments from largest score nodes and transfer to smallest score nodes. - segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...) channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...) + if len(channelPlans) == 0 { + segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...) + } } + if len(segmentPlans) != 0 || len(channelPlans) != 0 { PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager) } diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 58ac1f2f70..c0f6257c53 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -71,7 +71,8 @@ func NewResourceGroup(capacity int) *ResourceGroup { // assign node to resource group func (rg *ResourceGroup) assignNode(id int64, deltaCapacity int) error { if rg.containsNode(id) { - return ErrNodeAlreadyAssign + // add node to same rg more than once should be tolerable + return nil } rg.nodes.Insert(id) @@ -213,8 +214,12 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error { } rm.checkRGNodeStatus(rgName) - if rm.checkNodeAssigned(node) { - return ErrNodeAlreadyAssign + + for name, group := range rm.groups { + // check whether node has been assign to other rg + if name != rgName && group.containsNode(node) { + return ErrNodeAlreadyAssign + } } newNodes := rm.groups[rgName].GetNodes() @@ -238,10 +243,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error { return err } - err = rm.groups[rgName].assignNode(node, deltaCapacity) - if err != nil { - return err - } + rm.groups[rgName].assignNode(node, deltaCapacity) log.Info("add node to resource group", zap.String("rgName", rgName), @@ -251,16 +253,6 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error { return nil } -func (rm *ResourceManager) checkNodeAssigned(node int64) bool { - for _, group := range rm.groups { - if group.containsNode(node) { - return true - } - } - - return false -} - func (rm *ResourceManager) UnassignNode(rgName string, node int64) error { rm.rwmutex.Lock() defer rm.rwmutex.Unlock() @@ -542,17 +534,8 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) ([] } for _, node := range movedNodes { - err := rm.groups[from].unassignNode(node, deltaFromCapacity) - if err != nil { - // interrupt transfer, unreachable logic path - return nil, err - } - - err = rm.groups[to].assignNode(node, deltaToCapacity) - if err != nil { - // interrupt transfer, unreachable logic path - return nil, err - } + rm.groups[from].unassignNode(node, deltaFromCapacity) + rm.groups[to].assignNode(node, deltaToCapacity) log.Info("transfer node", zap.String("sourceRG", from),