diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 97eb504d3c..ca03c74b18 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -39,7 +39,7 @@ type SegmentAssignPlan struct { SegmentScore int64 } -func (segPlan *SegmentAssignPlan) ToString() string { +func (segPlan *SegmentAssignPlan) String() string { return fmt.Sprintf("SegmentPlan:[collectionID: %d, replicaID: %d, segmentID: %d, from: %d, to: %d, fromScore: %d, toScore: %d, segmentScore: %d]\n", segPlan.Segment.CollectionID, segPlan.Replica.GetID(), segPlan.Segment.ID, segPlan.From, segPlan.To, segPlan.FromScore, segPlan.ToScore, segPlan.SegmentScore) } @@ -51,7 +51,7 @@ type ChannelAssignPlan struct { To int64 } -func (chanPlan *ChannelAssignPlan) ToString() string { +func (chanPlan *ChannelAssignPlan) String() string { return fmt.Sprintf("ChannelPlan:[collectionID: %d, channel: %s, replicaID: %d, from: %d, to: %d]\n", chanPlan.Channel.CollectionID, chanPlan.Channel.ChannelName, chanPlan.Replica.GetID(), chanPlan.From, chanPlan.To) } diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index 7bf1dc5c1f..1a225bdd5a 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -17,6 +17,7 @@ package balance import ( + "fmt" "math" "sort" @@ -48,13 +49,23 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler, } } -func (b *ChannelLevelScoreBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { +func (b *ChannelLevelScoreBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { log := log.With( zap.Int64("collection", replica.GetCollectionID()), zap.Int64("replica id", replica.GetID()), zap.String("replica group", replica.GetResourceGroup()), ) + br := NewBalanceReport() + defer func() { + if len(segmentPlans) == 0 && len(channelPlans) == 0 { + log.WithRateGroup(fmt.Sprintf("scorebasedbalance-noplan-%d", replica.GetID()), 1, 60). + RatedDebug(60, "no plan generated, balance report", zap.Stringers("records", br.detailRecords)) + } else { + log.Info("balance plan generated", zap.Stringers("report details", br.records)) + } + }() + exclusiveMode := true channels := b.targetMgr.GetDmChannelsByCollection(replica.GetCollectionID(), meta.CurrentTarget) for channelName := range channels { @@ -69,8 +80,8 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(replica *meta.Replica) ([]Seg return b.ScoreBasedBalancer.BalanceReplica(replica) } - channelPlans := make([]ChannelAssignPlan, 0) - segmentPlans := make([]SegmentAssignPlan, 0) + channelPlans = make([]ChannelAssignPlan, 0) + segmentPlans = make([]SegmentAssignPlan, 0) for channelName := range channels { if replica.NodesCount() == 0 { return nil, nil @@ -120,7 +131,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(replica *meta.Replica) ([]Seg } if len(channelPlans) == 0 { - segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, channelName, rwNodes)...) + segmentPlans = append(segmentPlans, b.genSegmentPlan(br, replica, channelName, rwNodes)...) } } } @@ -159,9 +170,9 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica return segmentPlans } -func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan { +func (b *ChannelLevelScoreBalancer) genSegmentPlan(br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) + nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes) if len(nodeItemsMap) == 0 { return nil } diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 34ea48dab5..da0135b6bf 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -1,6 +1,7 @@ package balance import ( + "fmt" "math" "math/rand" "sort" @@ -467,12 +468,22 @@ type MultiTargetBalancer struct { targetMgr meta.TargetManagerInterface } -func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { +func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { log := log.With( zap.Int64("collection", replica.GetCollectionID()), zap.Int64("replica id", replica.GetID()), zap.String("replica group", replica.GetResourceGroup()), ) + br := NewBalanceReport() + defer func() { + if len(segmentPlans) == 0 && len(channelPlans) == 0 { + log.WithRateGroup(fmt.Sprintf("scorebasedbalance-noplan-%d", replica.GetID()), 1, 60). + RatedDebug(60, "no plan generated, balance report", zap.Stringers("records", br.detailRecords)) + } else { + log.Info("balance plan generated", zap.Stringers("report details", br.records)) + } + }() + if replica.NodesCount() == 0 { return nil, nil } @@ -486,7 +497,7 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAs } // print current distribution before generating plans - segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) @@ -504,7 +515,7 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAs } } else { if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { - channelPlans = append(channelPlans, b.genChannelPlan(replica, rwNodes)...) + channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...) } if len(channelPlans) == 0 { diff --git a/internal/querycoordv2/balance/report.go b/internal/querycoordv2/balance/report.go new file mode 100644 index 0000000000..ce3f2c1cf7 --- /dev/null +++ b/internal/querycoordv2/balance/report.go @@ -0,0 +1,104 @@ +package balance + +import ( + "fmt" + + "github.com/samber/lo" +) + +// balanceReport is the struct to store balance plan generation detail. +type balanceReport struct { + // node score information + // no mut protection, no concurrent safe guaranteed + // it safe for now since BalanceReport is used only one `Check` lifetime. + nodeItems map[int64]*nodeItemInfo + + // plain stringer records, String() is deferred utilizing zap.Stringer/Stringers feature + records []fmt.Stringer + detailRecords []fmt.Stringer +} + +// NewBalanceReport returns an initialized BalanceReport instance +func NewBalanceReport() *balanceReport { + return &balanceReport{ + nodeItems: make(map[int64]*nodeItemInfo), + } +} + +func (br *balanceReport) AddRecord(record fmt.Stringer) { + br.records = append(br.records, record) + br.detailRecords = append(br.detailRecords, record) +} + +func (br *balanceReport) AddDetailRecord(record fmt.Stringer) { + br.detailRecords = append(br.detailRecords, record) +} + +func (br *balanceReport) AddSegmentPlan() { +} + +func (br *balanceReport) AddNodeItem(item *nodeItem) { + _, ok := br.nodeItems[item.nodeID] + if !ok { + nodeItem := &nodeItemInfo{ + nodeItem: item, + memoryFactor: 1, + } + br.nodeItems[item.nodeID] = nodeItem + } +} + +func (br *balanceReport) SetMemoryFactor(node int64, memoryFactor float64) { + nodeItem, ok := br.nodeItems[node] + if ok { + nodeItem.memoryFactor = memoryFactor + } +} + +func (br *balanceReport) SetDeletagorScore(node int64, delegatorScore float64) { + nodeItem, ok := br.nodeItems[node] + if ok { + nodeItem.delegatorScore = delegatorScore + } +} + +func (br *balanceReport) NodesInfo() []fmt.Stringer { + return lo.Map(lo.Values(br.nodeItems), func(item *nodeItemInfo, _ int) fmt.Stringer { + return item + }) +} + +type nodeItemInfo struct { + nodeItem *nodeItem + delegatorScore float64 + memoryFactor float64 +} + +func (info *nodeItemInfo) String() string { + return fmt.Sprintf("NodeItemInfo %s, memory factor %f, delegator score: %f", info.nodeItem, info.memoryFactor, info.delegatorScore) +} + +// strRecord implment fmt.Stringer with simple string. +type strRecord string + +func (str strRecord) String() string { + return string(str) +} + +func StrRecord(str string) strRecord { return strRecord(str) } + +type strRecordf struct { + format string + values []any +} + +func (f strRecordf) String() string { + return fmt.Sprintf(f.format, f.values...) +} + +func StrRecordf(format string, values ...any) strRecordf { + return strRecordf{ + format: format, + values: values, + } +} diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 99e87a0156..90243e00f2 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -167,12 +167,21 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []* return ret } -func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { +func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { log := log.Ctx(context.TODO()).WithRateGroup("qcv2.RowCountBasedBalancer", 1, 60).With( zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetCollectionID()), zap.String("resourceGroup", replica.GetResourceGroup()), ) + br := NewBalanceReport() + defer func() { + if len(segmentPlans) == 0 && len(channelPlans) == 0 { + log.WithRateGroup(fmt.Sprintf("scorebasedbalance-noplan-%d", replica.GetID()), 1, 60). + RatedDebug(60, "no plan generated, balance report", zap.Stringers("records", br.detailRecords)) + } else { + log.Info("balance plan generated", zap.Stringers("report details", br.records)) + } + }() if replica.NodesCount() == 0 { return nil, nil } @@ -184,7 +193,7 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment return nil, nil } - segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) @@ -202,7 +211,7 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment } } else { if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { - channelPlans = append(channelPlans, b.genChannelPlan(replica, rwNodes)...) + channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...) } if len(channelPlans) == 0 { @@ -309,7 +318,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rw return channelPlans } -func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, rwNodes []int64) []ChannelAssignPlan { +func (b *RowCountBasedBalancer) genChannelPlan(br *balanceReport, replica *meta.Replica, rwNodes []int64) []ChannelAssignPlan { channelPlans := make([]ChannelAssignPlan, 0) if len(rwNodes) > 1 { // start to balance channels on all available nodes @@ -341,6 +350,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, rwNodes [] for i := range channelPlans { channelPlans[i].From = channelPlans[i].Channel.Node channelPlans[i].Replica = replica + br.AddRecord(StrRecordf("add channel plan %s", channelPlans[i])) } return channelPlans diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index f745721ab7..20aa630ed1 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -1182,7 +1182,7 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig suite.Equal(len(left), len(right)) type comparablePlan struct { - Segment *meta.Segment + Segment int64 ReplicaID int64 From int64 To int64 @@ -1195,7 +1195,7 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig replicaID = p.Replica.GetID() } leftPlan = append(leftPlan, comparablePlan{ - Segment: p.Segment, + Segment: p.Segment.ID, ReplicaID: replicaID, From: p.From, To: p.To, @@ -1209,7 +1209,7 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig replicaID = p.Replica.GetID() } rightPlan = append(rightPlan, comparablePlan{ - Segment: p.Segment, + Segment: p.Segment.ID, ReplicaID: replicaID, From: p.From, To: p.To, @@ -1225,7 +1225,7 @@ func assertSegmentAssignPlanElementMatch(suite *suite.Suite, left []SegmentAssig // remove it after resource group enhancement. func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssignPlan, right []ChannelAssignPlan, subset ...bool) { type comparablePlan struct { - Channel *meta.DmChannel + Channel string ReplicaID int64 From int64 To int64 @@ -1238,7 +1238,7 @@ func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssig replicaID = p.Replica.GetID() } leftPlan = append(leftPlan, comparablePlan{ - Channel: p.Channel, + Channel: p.Channel.GetChannelName(), ReplicaID: replicaID, From: p.From, To: p.To, @@ -1252,7 +1252,7 @@ func assertChannelAssignPlanElementMatch(suite *suite.Suite, left []ChannelAssig replicaID = p.Replica.GetID() } rightPlan = append(rightPlan, comparablePlan{ - Channel: p.Channel, + Channel: p.Channel.GetChannelName(), ReplicaID: replicaID, From: p.From, To: p.To, diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 28345d709a..a0d741cc70 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -17,6 +17,7 @@ package balance import ( + "fmt" "math" "sort" @@ -50,16 +51,25 @@ func NewScoreBasedBalancer(scheduler task.Scheduler, // AssignSegment got a segment list, and try to assign each segment to node's with lowest score func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan { + br := NewBalanceReport() + return b.assignSegment(br, collectionID, segments, nodes, manualBalance) +} + +func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan { // skip out suspend node and stopping node during assignment, but skip this check for manual balance if !manualBalance { nodes = lo.Filter(nodes, func(node int64, _ int) bool { info := b.nodeManager.Get(node) - return info != nil && info.GetState() == session.NodeStateNormal + normalNode := info != nil && info.GetState() == session.NodeStateNormal + if !normalNode { + br.AddRecord(StrRecord(fmt.Sprintf("non-manual balance, skip abnormal node: %d", node))) + } + return normalNode }) } // calculate each node's score - nodeItemsMap := b.convertToNodeItems(collectionID, nodes) + nodeItemsMap := b.convertToNodeItems(br, collectionID, nodes) if len(nodeItemsMap) == 0 { return nil } @@ -97,6 +107,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. // if the segment reassignment doesn't got enough benefit, we should skip this reassignment // notice: we should skip benefit check for manual balance if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) { + br.AddRecord(StrRecordf("skip generate balance plan for segment %d since no enough benefit", s.ID)) return } @@ -115,6 +126,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. ToScore: int64(targetNode.getPriority()), SegmentScore: int64(scoreChanges), } + br.AddRecord(StrRecordf("add segment plan %s", plan)) plans = append(plans, plan) // update the sourceNode and targetNode's score @@ -153,17 +165,18 @@ func (b *ScoreBasedBalancer) hasEnoughBenefit(sourceNode *nodeItem, targetNode * return true } -func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []int64) map[int64]*nodeItem { +func (b *ScoreBasedBalancer) convertToNodeItems(br *balanceReport, collectionID int64, nodeIDs []int64) map[int64]*nodeItem { totalScore := 0 nodeScoreMap := make(map[int64]*nodeItem) nodeMemMap := make(map[int64]float64) totalMemCapacity := float64(0) allNodeHasMemInfo := true for _, node := range nodeIDs { - score := b.calculateScore(collectionID, node) + score := b.calculateScore(br, collectionID, node) nodeItem := newNodeItem(score, node) nodeScoreMap[node] = &nodeItem totalScore += score + br.AddNodeItem(nodeScoreMap[node]) // set memory default to 1.0, will multiply average value to compute assigned score nodeInfo := b.nodeManager.Get(node) @@ -191,19 +204,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in for _, node := range nodeIDs { if allNodeHasMemInfo { nodeScoreMap[node].setAssignedScore(nodeMemMap[node] * average) + br.SetMemoryFactor(node, nodeMemMap[node]) } else { nodeScoreMap[node].setAssignedScore(average) } // use assignedScore * delegatorOverloadFactor * delegator_num, to preserve fixed memory size for delegator collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(node)) if len(collectionViews) > 0 { - nodeScoreMap[node].AddCurrentScoreDelta(nodeScoreMap[node].getAssignedScore() * delegatorOverloadFactor * float64(len(collectionViews))) + delegatorDelta := nodeScoreMap[node].getAssignedScore() * delegatorOverloadFactor * float64(len(collectionViews)) + nodeScoreMap[node].AddCurrentScoreDelta(delegatorDelta) + br.SetDeletagorScore(node, delegatorDelta) } } return nodeScoreMap } -func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { +func (b *ScoreBasedBalancer) calculateScore(br *balanceReport, collectionID, nodeID int64) int { nodeRowCount := 0 // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) @@ -236,6 +252,9 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { // calculate executing task cost in scheduler collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID) + br.AddDetailRecord(StrRecordf("Calcalute score for collection %d on node %d, global row count: %d, collection row count: %d", + collectionID, nodeID, nodeRowCount, collectionRowCount)) + return collectionRowCount + int(float64(nodeRowCount)* params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } @@ -245,13 +264,23 @@ func (b *ScoreBasedBalancer) calculateSegmentScore(s *meta.Segment) float64 { return float64(s.GetNumOfRows()) * (1 + params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } -func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { +func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { log := log.With( zap.Int64("collection", replica.GetCollectionID()), zap.Int64("replica id", replica.GetID()), zap.String("replica group", replica.GetResourceGroup()), ) + br := NewBalanceReport() + defer func() { + if len(segmentPlans) == 0 && len(channelPlans) == 0 { + log.WithRateGroup(fmt.Sprintf("scorebasedbalance-noplan-%d", replica.GetID()), 1, 60). + RatedDebug(60, "no plan generated, balance report", zap.Stringers("nodesInfo", br.NodesInfo()), zap.Stringers("records", br.detailRecords)) + } else { + log.Info("balance plan generated", zap.Stringers("nodesInfo", br.NodesInfo()), zap.Stringers("report details", br.records)) + } + }() if replica.NodesCount() == 0 { + br.AddRecord(StrRecord("replica has no querynode")) return nil, nil } @@ -260,14 +289,16 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss if len(rwNodes) == 0 { // no available nodes to balance + br.AddRecord(StrRecord("no rwNodes to balance")) return nil, nil } // print current distribution before generating plans - segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(roNodes) != 0 { if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) + br.AddRecord(StrRecord("stopping balance is disabled")) return nil, nil } @@ -275,6 +306,7 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss zap.Any("stopping nodes", roNodes), zap.Any("available nodes", rwNodes), ) + br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes)) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...) if len(channelPlans) == 0 { @@ -282,11 +314,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss } } else { if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() { - channelPlans = append(channelPlans, b.genChannelPlan(replica, rwNodes)...) + channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...) } if len(channelPlans) == 0 { - segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, rwNodes)...) + segmentPlans = append(segmentPlans, b.genSegmentPlan(br, replica, rwNodes)...) } } @@ -310,9 +342,9 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin return segmentPlans } -func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { +func (b *ScoreBasedBalancer) genSegmentPlan(br *balanceReport, replica *meta.Replica, onlineNodes []int64) []SegmentAssignPlan { segmentDist := make(map[int64][]*meta.Segment) - nodeItemsMap := b.convertToNodeItems(replica.GetCollectionID(), onlineNodes) + nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes) if len(nodeItemsMap) == 0 { return nil } @@ -334,6 +366,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ currentScore := nodeItemsMap[node].getCurrentScore() assignedScore := nodeItemsMap[node].getAssignedScore() if currentScore <= assignedScore { + br.AddRecord(StrRecordf("node %d skip balance since current score(%f) lower than assigned one (%f)", node, currentScore, assignedScore)) continue } @@ -341,13 +374,17 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ return segments[i].GetNumOfRows() < segments[j].GetNumOfRows() }) for _, s := range segments { + segmentScore := b.calculateSegmentScore(s) + br.AddRecord(StrRecordf("pick segment %d with score %f from node %d", s.ID, segmentScore, node)) segmentsToMove = append(segmentsToMove, s) if len(segmentsToMove) >= balanceBatchSize { + br.AddRecord(StrRecordf("stop add segment candidate since current plan is equal to batch max(%d)", balanceBatchSize)) break } - currentScore -= b.calculateSegmentScore(s) + currentScore -= segmentScore if currentScore <= assignedScore { + br.AddRecord(StrRecordf("stop add segment candidate since node[%d] current score(%f) below assigned(%f)", node, currentScore, assignedScore)) break } } @@ -355,14 +392,19 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ // if the segment are redundant, skip it's balance for now segmentsToMove = lo.Filter(segmentsToMove, func(s *meta.Segment, _ int) bool { - return len(b.dist.SegmentDistManager.GetByFilter(meta.WithReplica(replica), meta.WithSegmentID(s.GetID()))) == 1 + times := len(b.dist.SegmentDistManager.GetByFilter(meta.WithReplica(replica), meta.WithSegmentID(s.GetID()))) + segmentUnique := times == 1 + if !segmentUnique { + br.AddRecord(StrRecordf("abort balancing segment %d since it appear multiple times(%d) in distribution", s.ID, times)) + } + return segmentUnique }) if len(segmentsToMove) == 0 { return nil } - segmentPlans := b.AssignSegment(replica.GetCollectionID(), segmentsToMove, onlineNodes, false) + segmentPlans := b.assignSegment(br, replica.GetCollectionID(), segmentsToMove, onlineNodes, false) for i := range segmentPlans { segmentPlans[i].From = segmentPlans[i].Segment.Node segmentPlans[i].Replica = replica diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 791b373458..9ea0665303 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -129,10 +129,10 @@ func PrintNewBalancePlans(collectionID int64, replicaID int64, segmentPlans []Se ) { balanceInfo := fmt.Sprintf("%s new plans:{collectionID:%d, replicaID:%d, ", PlanInfoPrefix, collectionID, replicaID) for _, segmentPlan := range segmentPlans { - balanceInfo += segmentPlan.ToString() + balanceInfo += segmentPlan.String() } for _, channelPlan := range channelPlans { - balanceInfo += channelPlan.ToString() + balanceInfo += channelPlan.String() } balanceInfo += "}" log.Info(balanceInfo)