enhance: Refactor balance checker with priority queue (#43992)

issue: #43858
Refactor the balance checker implementation to use priority queues for
managing collection balance operations, improving processing efficiency
and order control.

Changes include:
- Export priority queue interfaces (Item, BaseItem, PriorityQueue)
- Replace collection round-robin with priority-based queue system
- Add BalanceCheckCollectionMaxCount configuration parameter
- Optimize balance task generation with batch processing limits
- Refactor processBalanceQueue method for different strategies
- Enhance test coverage with comprehensive unit tests

The new priority queue system processes collections based on row count
or collection ID order, providing better control over balance operation
priorities and resource utilization.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-09-19 17:46:01 +08:00 committed by GitHub
parent bed94fc061
commit 6d4961b978
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1366 additions and 1152 deletions

View File

@ -20,24 +20,24 @@ import (
"container/heap"
)
type item interface {
type Item interface {
getPriority() int
setPriority(priority int)
}
type baseItem struct {
type BaseItem struct {
priority int
}
func (b *baseItem) getPriority() int {
func (b *BaseItem) getPriority() int {
return b.priority
}
func (b *baseItem) setPriority(priority int) {
func (b *BaseItem) setPriority(priority int) {
b.priority = priority
}
type heapQueue []item
type heapQueue []Item
func (hq heapQueue) Len() int {
return len(hq)
@ -52,7 +52,7 @@ func (hq heapQueue) Swap(i, j int) {
}
func (hq *heapQueue) Push(x any) {
i := x.(item)
i := x.(Item)
*hq = append(*hq, i)
}
@ -64,22 +64,30 @@ func (hq *heapQueue) Pop() any {
return ret
}
type priorityQueue struct {
type PriorityQueue struct {
heapQueue
}
func newPriorityQueue() priorityQueue {
func NewPriorityQueue() PriorityQueue {
hq := make(heapQueue, 0)
heap.Init(&hq)
return priorityQueue{
return PriorityQueue{
heapQueue: hq,
}
}
func (pq *priorityQueue) push(item item) {
func NewPriorityQueuePtr() *PriorityQueue {
hq := make(heapQueue, 0)
heap.Init(&hq)
return &PriorityQueue{
heapQueue: hq,
}
}
func (pq *PriorityQueue) Push(item Item) {
heap.Push(&pq.heapQueue, item)
}
func (pq *priorityQueue) pop() item {
return heap.Pop(&pq.heapQueue).(item)
func (pq *PriorityQueue) Pop() Item {
return heap.Pop(&pq.heapQueue).(Item)
}

View File

@ -23,74 +23,74 @@ import (
)
func TestMinPriorityQueue(t *testing.T) {
pq := newPriorityQueue()
pq := NewPriorityQueue()
for i := 0; i < 5; i++ {
priority := i % 3
nodeItem := newNodeItem(priority, int64(i))
pq.push(&nodeItem)
pq.Push(&nodeItem)
}
item := pq.pop()
item := pq.Pop()
assert.Equal(t, item.getPriority(), 0)
assert.Equal(t, item.(*nodeItem).nodeID, int64(0))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 0)
assert.Equal(t, item.(*nodeItem).nodeID, int64(3))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(1))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(4))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 2)
println(item.getPriority())
assert.Equal(t, item.(*nodeItem).nodeID, int64(2))
}
func TestPopPriorityQueue(t *testing.T) {
pq := newPriorityQueue()
pq := NewPriorityQueue()
for i := 0; i < 1; i++ {
priority := 1
nodeItem := newNodeItem(priority, int64(i))
pq.push(&nodeItem)
pq.Push(&nodeItem)
}
item := pq.pop()
item := pq.Pop()
assert.Equal(t, item.getPriority(), 1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(0))
pq.push(item)
pq.Push(item)
// if it's round robin, but not working
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(0))
}
func TestMaxPriorityQueue(t *testing.T) {
pq := newPriorityQueue()
pq := NewPriorityQueue()
for i := 0; i < 5; i++ {
priority := i % 3
nodeItem := newNodeItem(-priority, int64(i))
pq.push(&nodeItem)
pq.Push(&nodeItem)
}
item := pq.pop()
item := pq.Pop()
assert.Equal(t, item.getPriority(), -2)
assert.Equal(t, item.(*nodeItem).nodeID, int64(2))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), -1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(4))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), -1)
assert.Equal(t, item.(*nodeItem).nodeID, int64(1))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 0)
assert.Equal(t, item.(*nodeItem).nodeID, int64(3))
item = pq.pop()
item = pq.Pop()
assert.Equal(t, item.getPriority(), 0)
assert.Equal(t, item.(*nodeItem).nodeID, int64(0))
}

View File

@ -57,9 +57,9 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
if len(nodeItems) == 0 {
return nil
}
queue := newPriorityQueue()
queue := NewPriorityQueue()
for _, item := range nodeItems {
queue.push(item)
queue.Push(item)
}
sort.Slice(segments, func(i, j int) bool {
@ -70,7 +70,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
plans := make([]SegmentAssignPlan, 0, len(segments))
for _, s := range segments {
// pick the node with the least row count and allocate to it.
ni := queue.pop().(*nodeItem)
ni := queue.Pop().(*nodeItem)
plan := SegmentAssignPlan{
From: -1,
To: ni.nodeID,
@ -82,7 +82,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID
}
// change node's score and push back
ni.AddCurrentScoreDelta(float64(s.GetNumOfRows()))
queue.push(ni)
queue.Push(ni)
}
return plans
}
@ -108,9 +108,9 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID
return nil
}
queue := newPriorityQueue()
queue := NewPriorityQueue()
for _, item := range nodeItems {
queue.push(item)
queue.Push(item)
}
plans := make([]ChannelAssignPlan, 0)
@ -125,7 +125,7 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID
}
if ni == nil {
// pick the node with the least channel num and allocate to it.
ni = queue.pop().(*nodeItem)
ni = queue.Pop().(*nodeItem)
}
plan := ChannelAssignPlan{
From: -1,
@ -135,7 +135,7 @@ func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID
plans = append(plans, plan)
// change node's score and push back
ni.AddCurrentScoreDelta(1)
queue.push(ni)
queue.Push(ni)
}
return plans
}
@ -408,7 +408,7 @@ func NewRowCountBasedBalancer(
}
type nodeItem struct {
baseItem
BaseItem
fmt.Stringer
nodeID int64
assignedScore float64
@ -417,7 +417,7 @@ type nodeItem struct {
func newNodeItem(currentScore int, nodeID int64) nodeItem {
return nodeItem{
baseItem: baseItem{},
BaseItem: BaseItem{},
nodeID: nodeID,
currentScore: float64(currentScore),
}

View File

@ -79,9 +79,9 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return nil
}
queue := newPriorityQueue()
queue := NewPriorityQueue()
for _, item := range nodeItemsMap {
queue.push(item)
queue.Push(item)
}
// sort segments by segment row count, if segment has same row count, sort by node's score
@ -100,9 +100,9 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
for _, s := range segments {
func(s *meta.Segment) {
// for each segment, pick the node with the least score
targetNode := queue.pop().(*nodeItem)
targetNode := queue.Pop().(*nodeItem)
// make sure candidate is always push back
defer queue.push(targetNode)
defer queue.Push(targetNode)
scoreChanges := b.calculateSegmentScore(s)
sourceNode := nodeItemsMap[s.Node]
@ -173,9 +173,9 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
return nil
}
queue := newPriorityQueue()
queue := NewPriorityQueue()
for _, item := range nodeItemsMap {
queue.push(item)
queue.Push(item)
}
plans := make([]ChannelAssignPlan, 0, len(channels))
for _, ch := range channels {
@ -193,10 +193,10 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
}
// for each channel, pick the node with the least score
if targetNode == nil {
targetNode = queue.pop().(*nodeItem)
targetNode = queue.Pop().(*nodeItem)
}
// make sure candidate is always push back
defer queue.push(targetNode)
defer queue.Push(targetNode)
scoreChanges := b.calculateChannelScore(ch, collectionID)
sourceNode := nodeItemsMap[ch.Node]

View File

@ -18,7 +18,6 @@ package checkers
import (
"context"
"sort"
"strings"
"time"
@ -36,22 +35,111 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// balanceConfig holds all configuration parameters for balance operations.
// This configuration controls how balance tasks are generated and executed.
type balanceConfig struct {
// segmentBatchSize specifies the maximum number of segment balance tasks to generate in one round
segmentBatchSize int
// channelBatchSize specifies the maximum number of channel balance tasks to generate in one round
channelBatchSize int
// balanceOnMultipleCollections determines whether to balance multiple collections in one round.
// If false, only balance one collection at a time to avoid resource contention
balanceOnMultipleCollections bool
// maxCheckCollectionCount limits the maximum number of collections to check in one round
// to prevent long-running balance operations
maxCheckCollectionCount int
// autoBalanceInterval controls how frequently automatic balance operations are triggered
autoBalanceInterval time.Duration
// segmentTaskTimeout specifies the timeout for segment balance tasks
segmentTaskTimeout time.Duration
// channelTaskTimeout specifies the timeout for channel balance tasks
channelTaskTimeout time.Duration
}
// This method fetches all balance-related configuration parameters from the global
// parameter table and returns a balanceConfig struct for use in balance operations.
func (b *BalanceChecker) loadBalanceConfig() balanceConfig {
return balanceConfig{
segmentBatchSize: paramtable.Get().QueryCoordCfg.BalanceSegmentBatchSize.GetAsInt(),
channelBatchSize: paramtable.Get().QueryCoordCfg.BalanceChannelBatchSize.GetAsInt(),
balanceOnMultipleCollections: paramtable.Get().QueryCoordCfg.EnableBalanceOnMultipleCollections.GetAsBool(),
maxCheckCollectionCount: paramtable.Get().QueryCoordCfg.BalanceCheckCollectionMaxCount.GetAsInt(),
autoBalanceInterval: paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond),
segmentTaskTimeout: paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
channelTaskTimeout: paramtable.Get().QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond),
}
}
// collectionBalanceItem represents a collection in the balance priority queue.
// Each item contains collection metadata and is used to determine the order
// in which collections should be processed for balance operations.
type collectionBalanceItem struct {
*balance.BaseItem
balancePriority int
// collectionID and rowCount are used to calculate the priority
collectionID int64
rowCount int
sortOrder string
}
// The priority is determined by the BalanceTriggerOrder configuration:
// - "byrowcount": Higher row count collections get higher priority (processed first)
// - "bycollectionid": Collections with smaller IDs get higher priority
func newCollectionBalanceItem(collectionID int64, rowCount int, sortOrder string) *collectionBalanceItem {
priority := 0
if sortOrder == "bycollectionid" {
priority = int(collectionID)
} else {
priority = -rowCount
}
return &collectionBalanceItem{
BaseItem: &balance.BaseItem{},
collectionID: collectionID,
rowCount: rowCount,
sortOrder: sortOrder,
balancePriority: priority,
}
}
func (c *collectionBalanceItem) getPriority() int {
return c.balancePriority
}
func (c *collectionBalanceItem) setPriority(priority int) {
c.balancePriority = priority
}
// BalanceChecker checks the cluster distribution and generates balance tasks.
// It is responsible for monitoring the load distribution across query nodes and
// generating segment/channel move tasks to maintain optimal balance.
//
// The BalanceChecker operates in two modes:
// 1. Stopping Balance: High-priority balance for nodes that are being stopped or read-only nodes
// 2. Normal Balance: Regular automatic balance operations to optimize cluster performance
//
// Both modes use priority queues to determine the order in which collections are processed.
type BalanceChecker struct {
*checkerActivation
meta *meta.Meta
nodeManager *session.NodeManager
scheduler task.Scheduler
targetMgr meta.TargetManagerInterface
// getBalancerFunc returns the appropriate balancer for generating balance plans
getBalancerFunc GetBalancerFunc
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
stoppingBalanceCollectionsCurrentRound typeutil.UniqueSet
// normalBalanceQueue maintains collections pending normal balance operations,
// ordered by priority (row count or collection ID)
normalBalanceQueue *balance.PriorityQueue
// stoppingBalanceQueue maintains collections pending stopping balance operations,
// used when nodes are being gracefully stopped
stoppingBalanceQueue *balance.PriorityQueue
// record auto balance ts
// autoBalanceTs records the timestamp of the last auto balance operation
// to ensure balance operations don't happen too frequently
autoBalanceTs time.Time
}
@ -66,8 +154,8 @@ func NewBalanceChecker(meta *meta.Meta,
meta: meta,
targetMgr: targetMgr,
nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
stoppingBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
normalBalanceQueue: balance.NewPriorityQueuePtr(),
stoppingBalanceQueue: balance.NewPriorityQueuePtr(),
scheduler: scheduler,
getBalancerFunc: getBalancerFunc,
}
@ -81,6 +169,12 @@ func (b *BalanceChecker) Description() string {
return "BalanceChecker checks the cluster distribution and generates balance tasks"
}
// readyToCheck determines if a collection is ready for balance operations.
// A collection is considered ready if:
// 1. It exists in the metadata
// 2. It has either a current target or next target defined
//
// Returns true if the collection is ready for balance operations.
func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) bool {
metaExist := (b.meta.GetCollection(ctx, collectionID) != nil)
targetExist := b.targetMgr.IsNextTargetExist(ctx, collectionID) || b.targetMgr.IsCurrentTargetExist(ctx, collectionID, common.AllPartitionsID)
@ -88,117 +182,151 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
return metaExist && targetExist
}
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 {
hasUnbalancedCollection := false
defer func() {
if !hasUnbalancedCollection {
b.stoppingBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+
"collections in one round, clear collectionIDs for this round")
}
}()
type ReadyForBalanceFilter func(ctx context.Context, collectionID int64) bool
// filterCollectionForBalance filters all collections using the provided filter functions.
// Only collections that pass ALL filter criteria will be included in the result.
// This is used to select collections eligible for balance operations based on
// various conditions like load status, target readiness, etc.
// Returns a slice of collection IDs that pass all filter criteria.
func (b *BalanceChecker) filterCollectionForBalance(ctx context.Context, filter ...ReadyForBalanceFilter) []int64 {
ids := b.meta.GetAll(ctx)
// Sort collections using the configured sort order
ids = b.sortCollections(ctx, ids)
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
ret := make([]int64, 0)
for _, cid := range ids {
// if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(ctx, cid) {
continue
shouldInclude := true
for _, f := range filter {
if !f(ctx, cid) {
shouldInclude = false
break
}
if b.stoppingBalanceCollectionsCurrentRound.Contain(cid) {
continue
}
if shouldInclude {
ret = append(ret, cid)
}
}
return ret
}
// constructStoppingBalanceQueue creates and populates the stopping balance priority queue.
// This queue contains collections that need balance operations due to nodes being stopped.
// Collections are ordered by priority (row count or collection ID based on configuration).
//
// Returns a new priority queue with all eligible collections for stopping balance.
// Note: cause stopping balance need to move out all data from the node, so we need to check all collections.
func (b *BalanceChecker) constructStoppingBalanceQueue(ctx context.Context) *balance.PriorityQueue {
sortOrder := strings.ToLower(Params.QueryCoordCfg.BalanceTriggerOrder.GetValue())
if sortOrder == "" {
sortOrder = "byrowcount" // Default to ByRowCount
}
replicas := b.meta.ReplicaManager.GetByCollection(ctx, cid)
stoppingReplicas := make([]int64, 0)
for _, replica := range replicas {
// If there are some delegator work on query node, we need to balance channel to streamingnode forcely.
ret := b.filterCollectionForBalance(ctx, b.readyToCheck)
pq := balance.NewPriorityQueuePtr()
for _, cid := range ret {
rowCount := b.targetMgr.GetCollectionRowCount(ctx, cid, meta.CurrentTargetFirst)
item := newCollectionBalanceItem(cid, int(rowCount), sortOrder)
pq.Push(item)
}
b.stoppingBalanceQueue = pq
return pq
}
// constructNormalBalanceQueue creates and populates the normal balance priority queue.
// This queue contains loaded collections that are ready for regular balance operations.
// Collections must meet multiple criteria:
// 1. Be ready for balance operations (metadata and target exist)
// 2. Have loaded status (actively serving queries)
// 3. Have current target ready (consistent state)
//
// Returns a new priority queue with all eligible collections for normal balance.
func (b *BalanceChecker) constructNormalBalanceQueue(ctx context.Context) *balance.PriorityQueue {
filterLoadedCollections := func(ctx context.Context, cid int64) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
}
filterTargetReadyCollections := func(ctx context.Context, cid int64) bool {
return b.targetMgr.IsCurrentTargetReady(ctx, cid)
}
sortOrder := strings.ToLower(Params.QueryCoordCfg.BalanceTriggerOrder.GetValue())
if sortOrder == "" {
sortOrder = "byrowcount" // Default to ByRowCount
}
ret := b.filterCollectionForBalance(ctx, b.readyToCheck, filterLoadedCollections, filterTargetReadyCollections)
pq := balance.NewPriorityQueuePtr()
for _, cid := range ret {
rowCount := b.targetMgr.GetCollectionRowCount(ctx, cid, meta.CurrentTargetFirst)
item := newCollectionBalanceItem(cid, int(rowCount), sortOrder)
pq.Push(item)
}
b.normalBalanceQueue = pq
return pq
}
// getReplicaForStoppingBalance returns replicas that need stopping balance operations.
// A replica needs stopping balance if it has:
// 1. Read-only (RO) nodes that need to be drained
// 2. Read-only streaming query (ROSQ) nodes that need to be drained
// 3. Channel read-only nodes when streaming service is enabled
//
// These replicas need immediate attention to move data off nodes that are being stopped.
//
// Returns a slice of replica IDs that need stopping balance operations.
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context, collectionID int64) []int64 {
filterReplicaWithRONodes := func(replica *meta.Replica, _ int) bool {
channelRONodes := make([]int64, 0)
if streamingutil.IsStreamingServiceEnabled() {
_, channelRONodes = utils.GetChannelRWAndRONodesFor260(replica, b.nodeManager)
}
if replica.RONodesCount()+replica.ROSQNodesCount() > 0 || len(channelRONodes) > 0 {
stoppingReplicas = append(stoppingReplicas, replica.GetID())
}
}
if len(stoppingReplicas) > 0 {
hasUnbalancedCollection = true
b.stoppingBalanceCollectionsCurrentRound.Insert(cid)
return stoppingReplicas
}
}
return replica.RONodesCount()+replica.ROSQNodesCount() > 0 || len(channelRONodes) > 0
}
// finish current round for stopping balance if no unbalanced collection
hasUnbalancedCollection = false
return nil
// filter replicas with RONodes or channelRONodes
replicas := b.meta.ReplicaManager.GetByCollection(ctx, collectionID)
ret := make([]int64, 0)
for _, replica := range replicas {
if filterReplicaWithRONodes(replica, 0) {
ret = append(ret, replica.GetID())
}
}
return ret
}
func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 {
hasUnbalancedCollection := false
defer func() {
if !hasUnbalancedCollection {
b.normalBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+
"collections in one round, clear collectionIDs for this round")
}
}()
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
// 2. when balancer isn't active, skip auto balance
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
// finish current round for normal balance if normal balance isn't triggered
hasUnbalancedCollection = false
return nil
}
ids := b.meta.GetAll(ctx)
// all replicas belonging to loading collection will be skipped
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
collection := b.meta.GetCollection(ctx, cid)
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
// getReplicaForNormalBalance returns all replicas for a collection for normal balance operations.
// Unlike stopping balance, normal balance considers all replicas regardless of their node status.
// This allows for comprehensive load balancing across the entire collection.
//
// Returns a slice of all replica IDs for the collection.
func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context, collectionID int64) []int64 {
replicas := b.meta.ReplicaManager.GetByCollection(ctx, collectionID)
return lo.Map(replicas, func(replica *meta.Replica, _ int) int64 {
return replica.GetID()
})
// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
// todo: should also check distribution and leader view in the future
return !b.targetMgr.IsCurrentTargetReady(ctx, cid)
})
if len(notReadyCollections) > 0 {
// finish current round for normal balance if any collection isn't ready
hasUnbalancedCollection = false
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
return nil
}
// Sort collections using the configured sort order
loadedCollections = b.sortCollections(ctx, loadedCollections)
// iterator one normal collection in one round
normalReplicasToBalance := make([]int64, 0)
for _, cid := range loadedCollections {
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid))
continue
}
hasUnbalancedCollection = true
b.normalBalanceCollectionsCurrentRound.Insert(cid)
for _, replica := range b.meta.ReplicaManager.GetByCollection(ctx, cid) {
normalReplicasToBalance = append(normalReplicasToBalance, replica.GetID())
}
break
}
return normalReplicasToBalance
}
func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
// generateBalanceTasksFromReplicas generates balance tasks for the given replicas.
// This method is the core of the balance operation that:
// 1. Uses the balancer to create segment and channel assignment plans
// 2. Converts these plans into executable tasks
// 3. Sets appropriate priorities and reasons for the tasks
//
// The process involves:
// - Getting balance plans from the configured balancer for each replica
// - Creating segment move tasks from segment assignment plans
// - Creating channel move tasks from channel assignment plans
// - Setting task metadata (priority, reason, timeout)
//
// Returns:
// - segmentTasks: tasks for moving segments between nodes
// - channelTasks: tasks for moving channels between nodes
func (b *BalanceChecker) generateBalanceTasksFromReplicas(ctx context.Context, replicas []int64, config balanceConfig) ([]task.Task, []task.Task) {
if len(replicas) == 0 {
return nil, nil
}
segmentPlans, channelPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
for _, rid := range replicaIDs {
for _, rid := range replicas {
replica := b.meta.ReplicaManager.Get(ctx, rid)
if replica == nil {
continue
@ -210,62 +338,103 @@ func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64
balance.PrintNewBalancePlans(replica.GetCollectionID(), replica.GetID(), sPlans, cPlans)
}
}
return segmentPlans, channelPlans
}
// Notice: balance checker will generate tasks for multiple collections in one round,
// so generated tasks will be submitted to scheduler directly, and return nil
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
segmentBatchSize := paramtable.Get().QueryCoordCfg.BalanceSegmentBatchSize.GetAsInt()
channelBatchSize := paramtable.Get().QueryCoordCfg.BalanceChannelBatchSize.GetAsInt()
balanceOnMultipleCollections := paramtable.Get().QueryCoordCfg.EnableBalanceOnMultipleCollections.GetAsBool()
segmentTasks := make([]task.Task, 0)
channelTasks := make([]task.Task, 0)
generateBalanceTaskForReplicas := func(replicas []int64) {
segmentPlans, channelPlans := b.balanceReplicas(ctx, replicas)
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
// Create segment tasks with error handling
if len(segmentPlans) > 0 {
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), config.segmentTaskTimeout, segmentPlans)
if len(tasks) > 0 {
task.SetPriority(task.TaskPriorityLow, tasks...)
task.SetReason("segment unbalanced", tasks...)
segmentTasks = append(segmentTasks, tasks...)
}
}
tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), channelPlans)
// Create channel tasks with error handling
if len(channelPlans) > 0 {
tasks := balance.CreateChannelTasksFromPlans(ctx, b.ID(), config.channelTaskTimeout, channelPlans)
if len(tasks) > 0 {
task.SetReason("channel unbalanced", tasks...)
channelTasks = append(channelTasks, tasks...)
}
stoppingReplicas := b.getReplicaForStoppingBalance(ctx)
if len(stoppingReplicas) > 0 {
// check for stopping balance first
generateBalanceTaskForReplicas(stoppingReplicas)
// iterate all collection to find a collection to balance
for len(segmentTasks) < segmentBatchSize && len(channelTasks) < channelBatchSize && b.stoppingBalanceCollectionsCurrentRound.Len() > 0 {
if !balanceOnMultipleCollections && (len(segmentTasks) > 0 || len(channelTasks) > 0) {
// if balance on multiple collections is disabled, and there are already some tasks, break
break
}
replicasToBalance := b.getReplicaForStoppingBalance(ctx)
generateBalanceTaskForReplicas(replicasToBalance)
}
} else {
// then check for auto balance
if time.Since(b.autoBalanceTs) > paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond) {
b.autoBalanceTs = time.Now()
replicasToBalance := b.getReplicaForNormalBalance(ctx)
generateBalanceTaskForReplicas(replicasToBalance)
// iterate all collection to find a collection to balance
for len(segmentTasks) < segmentBatchSize && len(channelTasks) < channelBatchSize && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
if !balanceOnMultipleCollections && (len(segmentTasks) > 0 || len(channelTasks) > 0) {
// if balance on multiple collections is disabled, and there are already some tasks, break
break
}
replicasToBalance := b.getReplicaForNormalBalance(ctx)
generateBalanceTaskForReplicas(replicasToBalance)
}
}
}
return segmentTasks, channelTasks
}
// processBalanceQueue processes balance queue with common logic for both normal and stopping balance.
// This is a template method that implements the core queue processing algorithm while allowing
// different balance types to provide their own specific logic through function parameters.
//
// The method implements several safeguards:
// 1. Batch size limits to prevent generating too many tasks at once
// 2. Collection count limits to prevent long-running operations
// 3. Multi-collection balance control to avoid resource contention
//
// Processing flow:
// 1. Get or construct the priority queue for collections
// 2. Pop collections from queue in priority order
// 3. Get replicas that need balance for the collection
// 4. Generate balance tasks for those replicas
// 5. Accumulate tasks until batch limits are reached
//
// Parameters:
// - ctx: context for the operation
// - getReplicasFunc: function to get replicas for a collection (normal vs stopping)
// - constructQueueFunc: function to construct a new priority queue if needed
// - getQueueFunc: function to get the existing priority queue
// - config: balance configuration with batch sizes and limits
//
// Returns:
// - generatedSegmentTaskNum: number of generated segment balance tasks
// - generatedChannelTaskNum: number of generated channel balance tasks
func (b *BalanceChecker) processBalanceQueue(
ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue,
config balanceConfig,
) (int, int) {
checkCollectionCount := 0
pq := getQueueFunc()
if pq == nil || pq.Len() == 0 {
pq = constructQueueFunc(ctx)
}
generatedSegmentTaskNum := 0
generatedChannelTaskNum := 0
for generatedSegmentTaskNum < config.segmentBatchSize &&
generatedChannelTaskNum < config.channelBatchSize &&
checkCollectionCount < config.maxCheckCollectionCount &&
pq.Len() > 0 {
// Break if balanceOnMultipleCollections is disabled and we already have tasks
if !config.balanceOnMultipleCollections && (generatedSegmentTaskNum > 0 || generatedChannelTaskNum > 0) {
log.Debug("Balance on multiple collections disabled, stopping after first collection")
break
}
item := pq.Pop().(*collectionBalanceItem)
checkCollectionCount++
replicasToBalance := getReplicasFunc(ctx, item.collectionID)
if len(replicasToBalance) == 0 {
continue
}
newSegmentTasks, newChannelTasks := b.generateBalanceTasksFromReplicas(ctx, replicasToBalance, config)
generatedSegmentTaskNum += len(newSegmentTasks)
generatedChannelTaskNum += len(newChannelTasks)
b.submitTasks(newSegmentTasks, newChannelTasks)
}
return generatedSegmentTaskNum, generatedChannelTaskNum
}
// submitTasks submits the generated balance tasks to the scheduler for execution.
// This method handles the final step of the balance process by adding all
// generated tasks to the task scheduler, which will execute them asynchronously.
func (b *BalanceChecker) submitTasks(segmentTasks, channelTasks []task.Task) {
for _, task := range segmentTasks {
b.scheduler.Add(task)
}
@ -273,45 +442,99 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
for _, task := range channelTasks {
b.scheduler.Add(task)
}
}
// Check is the main entry point for balance operations.
// This method implements a two-phase balance strategy with clear priorities:
//
// **Phase 1: Stopping Balance (Higher Priority)**
// - Handles nodes that are being gracefully stopped
// - Moves data off read-only nodes to active nodes
// - Critical for maintaining service availability during node shutdowns
// - Runs immediately when stopping nodes are detected
//
// **Phase 2: Normal Balance (Lower Priority)**
// - Performs regular load balancing to optimize cluster performance
// - Runs periodically based on autoBalanceInterval configuration
// - Considers all collections and distributes load evenly
// - Skipped if stopping balance tasks were generated
//
// **Key Design Decisions:**
// 1. Tasks are submitted directly to scheduler and nil is returned
// (unlike other checkers that return tasks to caller)
// 2. Stopping balance always takes precedence over normal balance
// 3. Performance monitoring alerts for operations > 100ms
// 4. Configuration is loaded fresh each time to respect dynamic updates
//
// **Return Value:**
// Always returns nil because tasks are submitted directly to the scheduler.
// This design allows the balance checker to handle multiple collections
// and large numbers of tasks efficiently.
//
// **Performance Monitoring:**
// The method tracks execution time and logs warnings for slow operations
// to help identify performance bottlenecks in large clusters.
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
// Skip balance operations if the checker is not active
if !b.IsActive() {
return nil
}
// Performance monitoring: track execution time
start := time.Now()
defer func() {
duration := time.Since(start)
if duration > 100*time.Millisecond {
log.Info("Balance check too slow", zap.Duration("duration", duration))
}
}()
// Load current configuration to respect dynamic parameter changes
config := b.loadBalanceConfig()
// Phase 1: Process stopping balance first (higher priority)
// This handles nodes that are being gracefully stopped and need immediate attention
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
generatedSegmentTaskNum, generatedChannelTaskNum := b.processBalanceQueue(ctx,
b.getReplicaForStoppingBalance,
b.constructStoppingBalanceQueue,
func() *balance.PriorityQueue { return b.stoppingBalanceQueue },
config)
if generatedSegmentTaskNum > 0 || generatedChannelTaskNum > 0 {
// clean up the normal balance queue when stopping balance generated tasks
// make sure that next time when trigger normal balance, a new normal balance round will be started
b.normalBalanceQueue = nil
return nil
}
func (b *BalanceChecker) sortCollections(ctx context.Context, collections []int64) []int64 {
sortOrder := strings.ToLower(Params.QueryCoordCfg.BalanceTriggerOrder.GetValue())
if sortOrder == "" {
sortOrder = "byrowcount" // Default to ByRowCount
}
collectionRowCountMap := make(map[int64]int64)
for _, cid := range collections {
collectionRowCountMap[cid] = b.targetMgr.GetCollectionRowCount(ctx, cid, meta.CurrentTargetFirst)
}
// Define sorting functions
sortByRowCount := func(i, j int) bool {
rowCount1 := collectionRowCountMap[collections[i]]
rowCount2 := collectionRowCountMap[collections[j]]
return rowCount1 > rowCount2 || (rowCount1 == rowCount2 && collections[i] < collections[j])
}
sortByCollectionID := func(i, j int) bool {
return collections[i] < collections[j]
}
// Select the appropriate sorting function
var sortFunc func(i, j int) bool
switch sortOrder {
case "byrowcount":
sortFunc = sortByRowCount
case "bycollectionid":
sortFunc = sortByCollectionID
default:
log.Warn("Invalid balance sort order configuration, using default ByRowCount", zap.String("sortOrder", sortOrder))
sortFunc = sortByRowCount
}
// Sort the collections
sort.Slice(collections, sortFunc)
return collections
}
}
// Phase 2: Process normal balance if no stopping balance was needed
// This handles regular load balancing operations for cluster optimization
if paramtable.Get().QueryCoordCfg.AutoBalance.GetAsBool() {
// Respect the auto balance interval to prevent too frequent operations
if time.Since(b.autoBalanceTs) <= config.autoBalanceInterval {
return nil
}
generatedSegmentTaskNum, generatedChannelTaskNum := b.processBalanceQueue(ctx,
b.getReplicaForNormalBalance,
b.constructNormalBalanceQueue,
func() *balance.PriorityQueue { return b.normalBalanceQueue },
config)
// Submit normal balance tasks if any were generated
// Update the auto balance timestamp to enforce the interval
if generatedSegmentTaskNum > 0 || generatedChannelTaskNum > 0 {
b.autoBalanceTs = time.Now()
// clean up the stopping balance queue when normal balance generated tasks
// make sure that next time when trigger stopping balance, a new stopping balance round will be started
b.stoppingBalanceQueue = nil
}
}
// Always return nil as tasks are submitted directly to scheduler
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -2347,6 +2347,8 @@ type queryCoordConfig struct {
// query node task parallelism factor
QueryNodeTaskParallelismFactor ParamItem `refreshable:"true"`
BalanceCheckCollectionMaxCount ParamItem `refreshable:"true"`
}
func (p *queryCoordConfig) init(base *BaseTable) {
@ -2979,6 +2981,15 @@ If this parameter is set false, Milvus simply searches the growing segments with
Export: false,
}
p.QueryNodeTaskParallelismFactor.Init(base.mgr)
p.BalanceCheckCollectionMaxCount = ParamItem{
Key: "queryCoord.balanceCheckCollectionMaxCount",
Version: "2.6.2",
DefaultValue: "100",
Doc: "the max collection count for each balance check",
Export: false,
}
p.BalanceCheckCollectionMaxCount.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -393,6 +393,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1, Params.QueryNodeTaskParallelismFactor.GetAsInt())
params.Save("queryCoord.queryNodeTaskParallelismFactor", "2")
assert.Equal(t, 2, Params.QueryNodeTaskParallelismFactor.GetAsInt())
assert.Equal(t, 100, Params.BalanceCheckCollectionMaxCount.GetAsInt())
})
t.Run("test queryNodeConfig", func(t *testing.T) {