enhance: Rewrite gen segment plan based on assign segment (#29574)

issue: #29582
This PR rewrite gen segment plan logic based on assign segment in
`score_based_balancer`

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-01-04 11:10:44 +08:00 committed by GitHub
parent acbbabf043
commit 336fce0582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 132 additions and 143 deletions

View File

@ -179,9 +179,6 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(offlineNodes) != 0 {
log.Info("Handle stopping nodes",
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Any("stopping nodes", offlineNodes),
zap.Any("available nodes", onlineNodes),
)

View File

@ -17,11 +17,11 @@
package balance
import (
"math"
"sort"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -30,9 +30,10 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// score based segment use (collection_row_count + global_row_count * factor) as node' score
// and try to make each node has almost same score through balance segment.
type ScoreBasedBalancer struct {
*RowCountBasedBalancer
}
@ -48,52 +49,93 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
}
}
// TODO assign channel need to think of global channels
// AssignSegment got a segment list, and try to assign each segment to node's with lowest score
func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64) []SegmentAssignPlan {
// calculate each node's score
nodeItems := b.convertToNodeItems(collectionID, nodes)
if len(nodeItems) == 0 {
return nil
}
nodeItemsMap := lo.SliceToMap(nodeItems, func(item *nodeItem) (int64, *nodeItem) { return item.nodeID, item })
queue := newPriorityQueue()
for _, item := range nodeItems {
queue.push(item)
}
// sort segments by segment row count, if segment has same row count, sort by node's score
sort.Slice(segments, func(i, j int) bool {
if segments[i].GetNumOfRows() == segments[j].GetNumOfRows() {
node1 := nodeItemsMap[segments[i].Node]
node2 := nodeItemsMap[segments[j].Node]
if node1 != nil && node2 != nil {
return node1.getPriority() > node2.getPriority()
}
}
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
})
plans := make([]SegmentAssignPlan, 0, len(segments))
for _, s := range segments {
// pick the node with the least row count and allocate to it.
ni := queue.pop().(*nodeItem)
// for each segment, pick the node with the least score
targetNode := queue.pop().(*nodeItem)
priorityChange := b.calculateSegmentScore(s)
sourceNode := nodeItemsMap[s.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
if sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, priorityChange) {
continue
}
plan := SegmentAssignPlan{
From: -1,
To: ni.nodeID,
To: targetNode.nodeID,
Segment: s,
}
plans = append(plans, plan)
// change node's priority and push back, should count for both collection factor and local factor
p := ni.getPriority()
ni.setPriority(p + int(s.GetNumOfRows()) +
int(float64(s.GetNumOfRows())*params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
queue.push(ni)
// update the targetNode's score
targetNode.setPriority(targetNode.getPriority() + priorityChange)
queue.push(targetNode)
}
return plans
}
func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode *nodeItem, priorityChange int) bool {
// if the score diff between sourceNode and targetNode is lower than the unbalance toleration factor, there is no need to assign it targetNode
oldScoreDiff := math.Abs(float64(sourceNode.getPriority()) - float64(targetNode.getPriority()))
if oldScoreDiff < float64(targetNode.getPriority())*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
return false
}
newSourceScore := sourceNode.getPriority() - priorityChange
newTargetScore := targetNode.getPriority() + priorityChange
if newTargetScore > newSourceScore {
// if score diff reverted after segment reassignment, we will consider the benefit
// only trigger following segment reassignment when the generated reverted score diff
// is far smaller than the original score diff
newScoreDiff := math.Abs(float64(newSourceScore) - float64(newTargetScore))
if newScoreDiff*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() >= oldScoreDiff {
return false
}
}
return true
}
func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) []*nodeItem {
ret := make([]*nodeItem, 0, len(nodeIDs))
for _, nodeInfo := range b.getNodes(nodeIDs) {
node := nodeInfo.ID()
priority := b.calculatePriority(collectionID, node)
priority := b.calculateScore(collectionID, node)
nodeItem := newNodeItem(priority, node)
ret = append(ret, &nodeItem)
}
return ret
}
func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int {
func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
rowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID)
@ -123,93 +165,67 @@ func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int {
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}
// calculateSegmentScore calculate the score which the segment represented
func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) int {
return int(float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()))
}
func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
log := log.With(
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
)
nodes := replica.GetNodes()
if len(nodes) == 0 {
return nil, nil
}
nodesSegments := make(map[int64][]*meta.Segment)
stoppingNodesSegments := make(map[int64][]*meta.Segment)
outboundNodes := b.meta.ResourceManager.CheckOutboundNodes(replica)
// calculate stopping nodes and available nodes.
onlineNodes := make([]int64, 0)
offlineNodes := make([]int64, 0)
for _, nid := range nodes {
segments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nid)
// Only balance segments in targets
segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
})
if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil {
log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err))
log.Info("not existed node", zap.Int64("nid", nid), zap.Error(err))
continue
} else if isStopping {
stoppingNodesSegments[nid] = segments
offlineNodes = append(offlineNodes, nid)
} else if outboundNodes.Contain(nid) {
// if node is stop or transfer to other rg
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetCollectionID()),
zap.Int64("node", nid),
)
stoppingNodesSegments[nid] = segments
log.RatedInfo(10, "meet outbound node, try to move out all segment/channel", zap.Int64("node", nid))
offlineNodes = append(offlineNodes, nid)
} else {
nodesSegments[nid] = segments
onlineNodes = append(onlineNodes, nid)
}
}
if len(nodes) == len(stoppingNodesSegments) {
if len(nodes) == len(offlineNodes) || len(onlineNodes) == 0 {
// no available nodes to balance
log.Warn("All nodes is under stopping mode or outbound, skip balance replica",
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Int64s("nodes", replica.Replica.GetNodes()),
)
return nil, nil
}
if len(nodesSegments) <= 0 {
log.Warn("No nodes is available in resource group, skip balance replica",
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Int64s("nodes", replica.Replica.GetNodes()),
)
return nil, nil
}
// print current distribution before generating plans
segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(stoppingNodesSegments) != 0 {
if len(offlineNodes) != 0 {
log.Info("Handle stopping nodes",
zap.Int64("collection", replica.CollectionID),
zap.Int64("replica id", replica.Replica.GetID()),
zap.String("replica group", replica.Replica.GetResourceGroup()),
zap.Any("stopping nodes", maps.Keys(stoppingNodesSegments)),
zap.Any("available nodes", maps.Keys(nodesSegments)),
zap.Any("stopping nodes", offlineNodes),
zap.Any("available nodes", onlineNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, onlineNodes, offlineNodes)...)
if len(channelPlans) == 0 {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...)
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, onlineNodes, offlineNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments))...)
channelPlans = append(channelPlans, b.genChannelPlan(replica, onlineNodes)...)
}
if len(channelPlans) == 0 {
segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, nodesSegments)...)
segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, onlineNodes)...)
}
}
if len(segmentPlans) != 0 || len(channelPlans) != 0 {
PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager)
}
return segmentPlans, channelPlans
}
@ -232,91 +248,65 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin
return segmentPlans
}
func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan {
segmentPlans := make([]SegmentAssignPlan, 0)
func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeScore := make(map[int64]int, 0)
totalScore := 0
// generate candidates
nodeItems := b.convertToNodeItems(replica.GetCollectionID(), lo.Keys(nodesSegments))
lastIdx := len(nodeItems) - 1
havingMovedSegments := typeutil.NewUniqueSet()
for {
sort.Slice(nodeItems, func(i, j int) bool {
return nodeItems[i].priority <= nodeItems[j].priority
// list all segment which could be balanced, and calculate node's score
for _, node := range onlineNodes {
dist := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
segment.GetLevel() != datapb.SegmentLevel_L0
})
toNode := nodeItems[0]
fromNode := nodeItems[lastIdx]
segmentDist[node] = segments
fromPriority := fromNode.priority
toPriority := toNode.priority
unbalance := float64(fromPriority - toPriority)
if unbalance < float64(toPriority)*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() {
break
rowCount := b.calculateScore(replica.CollectionID, node)
totalScore += rowCount
nodeScore[node] = rowCount
}
if totalScore == 0 {
return nil
}
// find the segment from the node which has more score than the average
segmentsToMove := make([]*meta.Segment, 0)
average := totalScore / len(onlineNodes)
for node, segments := range segmentDist {
leftScore := nodeScore[node]
if leftScore <= average {
continue
}
// sort the segments in asc order, try to mitigate to-from-unbalance
// TODO: segment infos inside dist manager may change in the process of making balance plan
fromSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, fromNode.nodeID)
fromSegments = lo.Filter(fromSegments, func(segment *meta.Segment, _ int) bool {
return segment.GetLevel() != datapb.SegmentLevel_L0
sort.Slice(segments, func(i, j int) bool {
return segments[i].GetNumOfRows() < segments[j].GetNumOfRows()
})
sort.Slice(fromSegments, func(i, j int) bool {
return fromSegments[i].GetNumOfRows() < fromSegments[j].GetNumOfRows()
})
var targetSegmentToMove *meta.Segment
for _, segment := range fromSegments {
targetSegmentToMove = segment
if havingMovedSegments.Contain(targetSegmentToMove.GetID()) {
targetSegmentToMove = nil
continue
}
break
}
if targetSegmentToMove == nil {
// the node with the highest score doesn't have any segments suitable for balancing, stop balancing this round
break
}
nextFromPriority := fromPriority - int(targetSegmentToMove.GetNumOfRows()) - int(float64(targetSegmentToMove.GetNumOfRows())*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
nextToPriority := toPriority + int(targetSegmentToMove.GetNumOfRows()) + int(float64(targetSegmentToMove.GetNumOfRows())*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
// still unbalanced after this balance plan is executed
if nextToPriority <= nextFromPriority {
plan := SegmentAssignPlan{
ReplicaID: replica.GetID(),
From: fromNode.nodeID,
To: toNode.nodeID,
Segment: targetSegmentToMove,
}
segmentPlans = append(segmentPlans, plan)
} else {
// if unbalance reverted after balance action, we will consider the benefit
// only trigger following balance when the generated reverted balance
// is far smaller than the original unbalance
nextUnbalance := nextToPriority - nextFromPriority
if float64(nextUnbalance)*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() < unbalance {
plan := SegmentAssignPlan{
ReplicaID: replica.GetID(),
From: fromNode.nodeID,
To: toNode.nodeID,
Segment: targetSegmentToMove,
}
segmentPlans = append(segmentPlans, plan)
} else {
// if the tiniest segment movement between the highest scored node and lowest scored node will
// not provide sufficient balance benefit, we will seize balancing in this round
for _, s := range segments {
segmentsToMove = append(segmentsToMove, s)
leftScore -= b.calculateSegmentScore(s)
if leftScore <= average {
break
}
}
havingMovedSegments.Insert(targetSegmentToMove.GetID())
// update node priority
toNode.setPriority(nextToPriority)
fromNode.setPriority(nextFromPriority)
// if toNode and fromNode can not find segment to balance, break, else try to balance the next round
// TODO swap segment between toNode and fromNode, see if the cluster becomes more balance
}
// if the segment are redundant, skip it's balance for now
segmentsToMove = lo.Filter(segmentsToMove, func(s *meta.Segment, _ int) bool {
return len(b.dist.SegmentDistManager.Get(s.GetID())) == 1
})
if len(segmentsToMove) == 0 {
return nil
}
segmentPlans := b.AssignSegment(replica.CollectionID, segmentsToMove, onlineNodes)
for i := range segmentPlans {
segmentPlans[i].From = segmentPlans[i].Segment.Node
segmentPlans[i].ReplicaID = replica.ID
}
return segmentPlans
}

View File

@ -353,6 +353,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
@ -463,6 +464,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
append(balanceCase.nodes, balanceCase.notExistedNodes...)))
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionCurrentTarget(balanceCase.collectionIDs[i])
balancer.targetMgr.UpdateCollectionNextTarget(balanceCase.collectionIDs[i])
}
// 2. set up target for distribution for multi collections