diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index cb59eb67a1..332762ec10 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -164,7 +164,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeScore := make(map[int64]int, 0) + nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) totalScore := 0 // list all segment which could be balanced, and calculate node's score @@ -176,10 +176,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe segment.GetLevel() != datapb.SegmentLevel_L0 }) segmentDist[node] = segments - - rowCount := b.calculateScore(replica.GetCollectionID(), node) - totalScore += rowCount - nodeScore[node] = rowCount + totalScore += nodeScore[node].getPriority() } if totalScore == 0 { @@ -190,7 +187,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) for node, segments := range segmentDist { - leftScore := nodeScore[node] + leftScore := nodeScore[node].getPriority() if leftScore <= average { continue } diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index c9a3ea0bb1..d0c3bc079c 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -84,15 +84,16 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() { func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { cases := []struct { - name string - comment string - distributions map[int64][]*meta.Segment - assignments [][]*meta.Segment - nodes []int64 - collectionIDs []int64 - segmentCnts []int - states []session.State - expectPlans [][]SegmentAssignPlan + name string + comment string + distributions map[int64][]*meta.Segment + assignments [][]*meta.Segment + nodes []int64 + collectionIDs []int64 + segmentCnts []int + states []session.State + expectPlans [][]SegmentAssignPlan + unstableAssignment bool }{ { name: "test empty cluster assigning one collection", @@ -105,10 +106,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}}, }, }, - nodes: []int64{1, 2, 3}, - collectionIDs: []int64{0}, - states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, - segmentCnts: []int{0, 0, 0}, + nodes: []int64{1, 2, 3}, + collectionIDs: []int64{0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + segmentCnts: []int{0, 0, 0}, + unstableAssignment: true, expectPlans: [][]SegmentAssignPlan{ { // as assign segments is used while loading collection, @@ -237,7 +239,11 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { } for i := range c.collectionIDs { plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false) - assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + if c.unstableAssignment { + suite.Equal(len(plans), len(c.expectPlans[i])) + } else { + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + } } }) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index b1e2b23f6b..87d7c6b5e0 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -60,14 +60,13 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. } // calculate each node's score - nodeItems := b.convertToNodeItems(collectionID, nodes) - if len(nodeItems) == 0 { + nodeItemsMap := b.convertToNodeItems(collectionID, nodes) + if len(nodeItemsMap) == 0 { return nil } - nodeItemsMap := lo.SliceToMap(nodeItems, func(item *nodeItem) (int64, *nodeItem) { return item.nodeID, item }) queue := newPriorityQueue() - for _, item := range nodeItems { + for _, item := range nodeItemsMap { queue.push(item) } @@ -139,33 +138,45 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode * return true } -func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) []*nodeItem { - ret := make([]*nodeItem, 0, len(nodeIDs)) +func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem { + totalScore := 0 + nodeScoreMap := make(map[int64]*nodeItem) for _, node := range nodeIDs { - priority := b.calculateScore(collectionID, node) - nodeItem := newNodeItem(priority, node) - ret = append(ret, &nodeItem) + score := b.calculateScore(collectionID, node) + nodeItem := newNodeItem(score, node) + nodeScoreMap[node] = &nodeItem + totalScore += score } - return ret + + if totalScore == 0 { + return nodeScoreMap + } + + average := totalScore / len(nodeIDs) + delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() + // use average * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator + for _, node := range nodeIDs { + collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node)) + if len(collectionViews) > 0 { + newScore := nodeScoreMap[node].getPriority() + int(float64(average)*delegatorOverloadFactor)*len(collectionViews) + nodeScoreMap[node].setPriority(newScore) + } + } + return nodeScoreMap } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { - delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() - nodeRowCount := 0 - nodeCollectionRowCount := make(map[int64]int) // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) for _, s := range globalSegments { nodeRowCount += int(s.GetNumOfRows()) - nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID)) for _, view := range views { nodeRowCount += int(float64(view.NumOfGrowingRows)) - nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -182,7 +193,6 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID)) for _, view := range collectionViews { collectionRowCount += int(float64(view.NumOfGrowingRows)) - collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -266,7 +276,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeScore := make(map[int64]int, 0) + nodeScore := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) totalScore := 0 // list all segment which could be balanced, and calculate node's score @@ -278,10 +288,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ segment.GetLevel() != datapb.SegmentLevel_L0 }) segmentDist[node] = segments - - rowCount := b.calculateScore(replica.GetCollectionID(), node) - totalScore += rowCount - nodeScore[node] = rowCount + totalScore += nodeScore[node].getPriority() } if totalScore == 0 { @@ -292,7 +299,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) for node, segments := range segmentDist { - leftScore := nodeScore[node] + leftScore := nodeScore[node].getPriority() if leftScore <= average { continue } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 27382dc7e0..b125a9ead9 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -84,15 +84,16 @@ func (suite *ScoreBasedBalancerTestSuite) TearDownTest() { func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { cases := []struct { - name string - comment string - distributions map[int64][]*meta.Segment - assignments [][]*meta.Segment - nodes []int64 - collectionIDs []int64 - segmentCnts []int - states []session.State - expectPlans [][]SegmentAssignPlan + name string + comment string + distributions map[int64][]*meta.Segment + assignments [][]*meta.Segment + nodes []int64 + collectionIDs []int64 + segmentCnts []int + states []session.State + expectPlans [][]SegmentAssignPlan + unstableAssignment bool }{ { name: "test empty cluster assigning one collection", @@ -105,10 +106,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 15, CollectionID: 1}}, }, }, - nodes: []int64{1, 2, 3}, - collectionIDs: []int64{0}, - states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, - segmentCnts: []int{0, 0, 0}, + nodes: []int64{1, 2, 3}, + collectionIDs: []int64{0}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + segmentCnts: []int{0, 0, 0}, + unstableAssignment: true, expectPlans: [][]SegmentAssignPlan{ { // as assign segments is used while loading collection, @@ -237,7 +239,11 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { } for i := range c.collectionIDs { plans := balancer.AssignSegment(c.collectionIDs[i], c.assignments[i], c.nodes, false) - assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + if c.unstableAssignment { + suite.Len(plans, len(c.expectPlans[i])) + } else { + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans[i], plans) + } } }) } @@ -461,6 +467,111 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { } } +func (suite *ScoreBasedBalancerTestSuite) TestDelegatorPreserveMemory() { + cases := []struct { + name string + nodes []int64 + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo + states []session.State + shouldMock bool + distributions map[int64][]*meta.Segment + distributionChannels map[int64][]*meta.DmChannel + expectPlans []SegmentAssignPlan + expectChannelPlans []ChannelAssignPlan + }{ + { + name: "normal balance for one collection only", + nodes: []int64{1, 2}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, {ID: 4, PartitionID: 1}, {ID: 5, PartitionID: 1}, + }, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 10}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 5, CollectionID: 1, NumOfRows: 10}, Node: 2}, + }, + }, + expectPlans: []SegmentAssignPlan{}, + expectChannelPlans: []ChannelAssignPlan{}, + }, + } + + for _, c := range cases { + suite.Run(c.name, func() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + // 1. set up target for multi collections + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID)) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + + // 2. set up target for distribution for multi collections + for node, s := range c.distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + for node, v := range c.distributionChannels { + balancer.dist.ChannelDistManager.Update(node, v...) + } + + leaderView := &meta.LeaderView{ + ID: 1, + CollectionID: 1, + } + suite.balancer.dist.LeaderViewManager.Update(1, leaderView) + + // 3. set up nodes info and resourceManager for balancer + for i := range c.nodes { + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) + nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) + nodeInfo.SetState(c.states[i]) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i]) + } + utils.RecoverAllCollection(balancer.meta) + + // disable delegator preserve memory + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0") + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(channelPlans, 0) + suite.Len(segmentPlans, 1) + suite.Equal(segmentPlans[0].To, int64(1)) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "1") + segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(channelPlans, 0) + suite.Len(segmentPlans, 0) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "2") + segmentPlans, channelPlans = suite.getCollectionBalancePlans(balancer, c.collectionID) + suite.Len(segmentPlans, 1) + suite.Equal(segmentPlans[0].To, int64(2)) + }) + } +} + func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() { cases := []struct { name string