From 336fce058258698f28392b530431e3ea174dd2fd Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 4 Jan 2024 11:10:44 +0800 Subject: [PATCH] enhance: Rewrite gen segment plan based on assign segment (#29574) issue: #29582 This PR rewrite gen segment plan logic based on assign segment in `score_based_balancer` Signed-off-by: Wei Liu --- .../balance/rowcount_based_balancer.go | 3 - .../balance/score_based_balancer.go | 270 +++++++++--------- .../balance/score_based_balancer_test.go | 2 + 3 files changed, 132 insertions(+), 143 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index b0112a176c..b84b1a1243 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -179,9 +179,6 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { log.Info("Handle stopping nodes", - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), ) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 7792682584..dd74ad2d07 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -17,11 +17,11 @@ package balance import ( + "math" "sort" "github.com/samber/lo" "go.uber.org/zap" - "golang.org/x/exp/maps" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -30,9 +30,10 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) +// score based segment use (collection_row_count + global_row_count * factor) as node' score +// and try to make each node has almost same score through balance segment. type ScoreBasedBalancer struct { *RowCountBasedBalancer } @@ -48,52 +49,93 @@ func NewScoreBasedBalancer(scheduler task.Scheduler, } } -// TODO assign channel need to think of global channels +// AssignSegment got a segment list, and try to assign each segment to node's with lowest score func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64) []SegmentAssignPlan { + // calculate each node's score nodeItems := b.convertToNodeItems(collectionID, nodes) if len(nodeItems) == 0 { return nil } + + nodeItemsMap := lo.SliceToMap(nodeItems, func(item *nodeItem) (int64, *nodeItem) { return item.nodeID, item }) queue := newPriorityQueue() for _, item := range nodeItems { queue.push(item) } + // sort segments by segment row count, if segment has same row count, sort by node's score sort.Slice(segments, func(i, j int) bool { + if segments[i].GetNumOfRows() == segments[j].GetNumOfRows() { + node1 := nodeItemsMap[segments[i].Node] + node2 := nodeItemsMap[segments[j].Node] + if node1 != nil && node2 != nil { + return node1.getPriority() > node2.getPriority() + } + } return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() }) plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { - // pick the node with the least row count and allocate to it. - ni := queue.pop().(*nodeItem) + // for each segment, pick the node with the least score + targetNode := queue.pop().(*nodeItem) + priorityChange := b.calculateSegmentScore(s) + + sourceNode := nodeItemsMap[s.Node] + // if segment's node exist, which means this segment comes from balancer. we should consider the benefit + // if the segment reassignment doesn't got enough benefit, we should skip this reassignment + if sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, priorityChange) { + continue + } + plan := SegmentAssignPlan{ From: -1, - To: ni.nodeID, + To: targetNode.nodeID, Segment: s, } plans = append(plans, plan) - // change node's priority and push back, should count for both collection factor and local factor - p := ni.getPriority() - ni.setPriority(p + int(s.GetNumOfRows()) + - int(float64(s.GetNumOfRows())*params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())) - queue.push(ni) + + // update the targetNode's score + targetNode.setPriority(targetNode.getPriority() + priorityChange) + queue.push(targetNode) } return plans } +func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, priorityChange int) bool { + // if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode + oldScoreDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority())) + if oldScoreDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() { + return false + } + + newSourceScore := sourceNode.getPriority() - priorityChange + newTargetScore := targetNode.getPriority() + priorityChange + if newTargetScore > newSourceScore { + // if score diff reverted after segment reassignment, we will consider the benefit + // only trigger following segment reassignment when the generated reverted score diff + // is far smaller than the original score diff + newScoreDiff := math.Abs(float64(newSourceScore) - float64(newTargetScore)) + if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldScoreDiff { + return false + } + } + + return true +} + func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) []*nodeItem { ret := make([]*nodeItem, 0, len(nodeIDs)) for _, nodeInfo := range b.getNodes(nodeIDs) { node := nodeInfo.ID() - priority := b.calculatePriority(collectionID, node) + priority := b.calculateScore(collectionID, node) nodeItem := newNodeItem(priority, node) ret = append(ret, &nodeItem) } return ret } -func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int { +func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { rowCount := 0 // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID) @@ -123,93 +165,67 @@ func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int { params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } +// calculateSegmentScore calculate the score which the segment represented +func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int { + return int(float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())) +} + func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { + log := log.With( + zap.Int64("collection", replica.CollectionID), + zap.Int64("replica id", replica.Replica.GetID()), + zap.String("replica group", replica.Replica.GetResourceGroup()), + ) nodes := replica.GetNodes() if len(nodes) == 0 { return nil, nil } - nodesSegments := make(map[int64][]*meta.Segment) - stoppingNodesSegments := make(map[int64][]*meta.Segment) outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica) - - // calculate stopping nodes and available nodes. + onlineNodes := make([]int64, 0) + offlineNodes := make([]int64, 0) for _, nid := range nodes { - segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid) - // Only balance segments in targets - segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool { - return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && - b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && - segment.GetLevel() != datapb.SegmentLevel_L0 - }) - if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { - log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err)) + log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err)) continue } else if isStopping { - stoppingNodesSegments[nid] = segments + offlineNodes = append(offlineNodes, nid) } else if outboundNodes.Contain(nid) { // if node is stop or transfer to other rg - log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetCollectionID()), - zap.Int64("node", nid), - ) - stoppingNodesSegments[nid] = segments + log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid)) + offlineNodes = append(offlineNodes, nid) } else { - nodesSegments[nid] = segments + onlineNodes = append(onlineNodes, nid) } } - if len(nodes) == len(stoppingNodesSegments) { + if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 { // no available nodes to balance - log.Warn("All nodes is under stopping mode or outbound, skip balance replica", - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), - zap.Int64s("nodes", replica.Replica.GetNodes()), - ) return nil, nil } - if len(nodesSegments) <= 0 { - log.Warn("No nodes is available in resource group, skip balance replica", - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), - zap.Int64s("nodes", replica.Replica.GetNodes()), - ) - return nil, nil - } // print current distribution before generating plans segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) - if len(stoppingNodesSegments) != 0 { + if len(offlineNodes) != 0 { log.Info("Handle stopping nodes", - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica id", replica.Replica.GetID()), - zap.String("replica group", replica.Replica.GetResourceGroup()), - zap.Any("stopping nodes", maps.Keys(stoppingNodesSegments)), - zap.Any("available nodes", maps.Keys(nodesSegments)), + zap.Any("stopping nodes", offlineNodes), + zap.Any("available nodes", onlineNodes), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) + channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes)...) if len(channelPlans) == 0 { - segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) + segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes)...) } } else { if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { - channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...) + channelPlans = append(channelPlans, b.genChannelPlan(replica, onlineNodes)...) } if len(channelPlans) == 0 { - segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, nodesSegments)...) + segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, onlineNodes)...) } } - if len(segmentPlans) != 0 || len(channelPlans) != 0 { - PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager) - } - return segmentPlans, channelPlans } @@ -232,91 +248,65 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin return segmentPlans } -func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan { - segmentPlans := make([]SegmentAssignPlan, 0) +func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { + segmentDist := make(map[int64][]*meta.Segment) + nodeScore := make(map[int64]int, 0) + totalScore := 0 - // generate candidates - nodeItems := b.convertToNodeItems(replica.GetCollectionID(), lo.Keys(nodesSegments)) - lastIdx := len(nodeItems) - 1 - havingMovedSegments := typeutil.NewUniqueSet() - - for { - sort.Slice(nodeItems, func(i, j int) bool { - return nodeItems[i].priority <= nodeItems[j].priority + // list all segment which could be balanced, and calculate node's score + for _, node := range onlineNodes { + dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node) + segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool { + return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil && + b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil && + segment.GetLevel() != datapb.SegmentLevel_L0 }) - toNode := nodeItems[0] - fromNode := nodeItems[lastIdx] + segmentDist[node] = segments - fromPriority := fromNode.priority - toPriority := toNode.priority - unbalance := float64(fromPriority - toPriority) - if unbalance < float64(toPriority)*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() { - break + rowCount := b.calculateScore(replica.CollectionID, node) + totalScore += rowCount + nodeScore[node] = rowCount + } + + if totalScore == 0 { + return nil + } + + // find the segment from the node which has more score than the average + segmentsToMove := make([]*meta.Segment, 0) + average := totalScore / len(onlineNodes) + for node, segments := range segmentDist { + leftScore := nodeScore[node] + if leftScore <= average { + continue } - // sort the segments in asc order, try to mitigate to-from-unbalance - // TODO: segment infos inside dist manager may change in the process of making balance plan - fromSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, fromNode.nodeID) - fromSegments = lo.Filter(fromSegments, func(segment *meta.Segment, _ int) bool { - return segment.GetLevel() != datapb.SegmentLevel_L0 + sort.Slice(segments, func(i, j int) bool { + return segments[i].GetNumOfRows() < segments[j].GetNumOfRows() }) - sort.Slice(fromSegments, func(i, j int) bool { - return fromSegments[i].GetNumOfRows() < fromSegments[j].GetNumOfRows() - }) - var targetSegmentToMove *meta.Segment - for _, segment := range fromSegments { - targetSegmentToMove = segment - if havingMovedSegments.Contain(targetSegmentToMove.GetID()) { - targetSegmentToMove = nil - continue - } - break - } - if targetSegmentToMove == nil { - // the node with the highest score doesn't have any segments suitable for balancing, stop balancing this round - break - } - - nextFromPriority := fromPriority - int(targetSegmentToMove.GetNumOfRows()) - int(float64(targetSegmentToMove.GetNumOfRows())* - params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) - nextToPriority := toPriority + int(targetSegmentToMove.GetNumOfRows()) + int(float64(targetSegmentToMove.GetNumOfRows())* - params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) - - // still unbalanced after this balance plan is executed - if nextToPriority <= nextFromPriority { - plan := SegmentAssignPlan{ - ReplicaID: replica.GetID(), - From: fromNode.nodeID, - To: toNode.nodeID, - Segment: targetSegmentToMove, - } - segmentPlans = append(segmentPlans, plan) - } else { - // if unbalance reverted after balance action, we will consider the benefit - // only trigger following balance when the generated reverted balance - // is far smaller than the original unbalance - nextUnbalance := nextToPriority - nextFromPriority - if float64(nextUnbalance)*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() < unbalance { - plan := SegmentAssignPlan{ - ReplicaID: replica.GetID(), - From: fromNode.nodeID, - To: toNode.nodeID, - Segment: targetSegmentToMove, - } - segmentPlans = append(segmentPlans, plan) - } else { - // if the tiniest segment movement between the highest scored node and lowest scored node will - // not provide sufficient balance benefit, we will seize balancing in this round + for _, s := range segments { + segmentsToMove = append(segmentsToMove, s) + leftScore -= b.calculateSegmentScore(s) + if leftScore <= average { break } } - havingMovedSegments.Insert(targetSegmentToMove.GetID()) - - // update node priority - toNode.setPriority(nextToPriority) - fromNode.setPriority(nextFromPriority) - // if toNode and fromNode can not find segment to balance, break, else try to balance the next round - // TODO swap segment between toNode and fromNode, see if the cluster becomes more balance } + + // if the segment are redundant, skip it's balance for now + segmentsToMove = lo.Filter(segmentsToMove, func(s *meta.Segment, _ int) bool { + return len(b.dist.SegmentDistManager.Get(s.GetID())) == 1 + }) + + if len(segmentsToMove) == 0 { + return nil + } + + segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, onlineNodes) + for i := range segmentPlans { + segmentPlans[i].From = segmentPlans[i].Segment.Node + segmentPlans[i].ReplicaID = replica.ID + } + return segmentPlans } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 9839d89338..598e01b3d8 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -353,6 +353,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) // 2. set up target for distribution for multi collections for node, s := range c.distributions { @@ -463,6 +464,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { append(balanceCase.nodes, balanceCase.notExistedNodes...))) balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i]) balancer.targetMgr.UpdateCollectionCurrentTarget(balanceCase.collectionIDs[i]) + balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i]) } // 2. set up target for distribution for multi collections