mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: balance checker may enter infinite normal balance loop after balance suspension (#41195)
issue: #41194 - Refactor hasUnbalancedCollection flag handling to function scope - Ensure tracking sets clearance when no balance needed - Add deferred cleanup for both normal/stopping balance paths - Add unit tests for collection tracking scenarios The changes ensure tracking sets (normalBalanceCollectionsCurrentRound and stoppingBalanceCollectionsCurrentRound) are properly cleared when: - All collections in current round are balanced - Balance checks return early due to unready targets - Balance feature flags are disabled Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
3bc24c264f
commit
a839d94c9e
@ -88,20 +88,20 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
|
||||
}
|
||||
|
||||
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 {
|
||||
ids := b.meta.GetAll(ctx)
|
||||
hasUnbalancedCollection := false
|
||||
defer func() {
|
||||
if !hasUnbalancedCollection {
|
||||
b.stoppingBalanceCollectionsCurrentRound.Clear()
|
||||
log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+
|
||||
"collections in one round, clear collectionIDs for this round")
|
||||
}
|
||||
}()
|
||||
|
||||
ids := b.meta.GetAll(ctx)
|
||||
// Sort collections using the configured sort order
|
||||
ids = b.sortCollections(ctx, ids)
|
||||
|
||||
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
||||
hasUnbalancedCollection := false
|
||||
defer func() {
|
||||
if !hasUnbalancedCollection {
|
||||
b.stoppingBalanceCollectionsCurrentRound.Clear()
|
||||
log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+
|
||||
"collections in one round, clear collectionIDs for this round")
|
||||
}
|
||||
}()
|
||||
for _, cid := range ids {
|
||||
// if target and meta isn't ready, skip balance this collection
|
||||
if !b.readyToCheck(ctx, cid) {
|
||||
@ -127,18 +127,30 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int
|
||||
}
|
||||
}
|
||||
|
||||
// finish current round for stopping balance if no unbalanced collection
|
||||
hasUnbalancedCollection = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 {
|
||||
hasUnbalancedCollection := false
|
||||
defer func() {
|
||||
if !hasUnbalancedCollection {
|
||||
b.normalBalanceCollectionsCurrentRound.Clear()
|
||||
log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+
|
||||
"collections in one round, clear collectionIDs for this round")
|
||||
}
|
||||
}()
|
||||
|
||||
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
|
||||
// 2. when balancer isn't active, skip auto balance
|
||||
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
|
||||
// finish current round for normal balance if normal balance isn't triggered
|
||||
hasUnbalancedCollection = false
|
||||
return nil
|
||||
}
|
||||
|
||||
ids := b.meta.GetAll(ctx)
|
||||
|
||||
// all replicas belonging to loading collection will be skipped
|
||||
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
|
||||
collection := b.meta.GetCollection(ctx, cid)
|
||||
@ -152,6 +164,8 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
|
||||
return !b.targetMgr.IsCurrentTargetReady(ctx, cid)
|
||||
})
|
||||
if len(notReadyCollections) > 0 {
|
||||
// finish current round for normal balance if any collection isn't ready
|
||||
hasUnbalancedCollection = false
|
||||
log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections))
|
||||
return nil
|
||||
}
|
||||
@ -161,7 +175,6 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
|
||||
|
||||
// iterator one normal collection in one round
|
||||
normalReplicasToBalance := make([]int64, 0)
|
||||
hasUnbalancedCollection := false
|
||||
for _, cid := range loadedCollections {
|
||||
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
|
||||
log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
|
||||
@ -175,12 +188,6 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if !hasUnbalancedCollection {
|
||||
b.normalBalanceCollectionsCurrentRound.Clear()
|
||||
log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+
|
||||
"collections in one round, clear collectionIDs for this round")
|
||||
}
|
||||
return normalReplicasToBalance
|
||||
}
|
||||
|
||||
|
||||
@ -758,6 +758,98 @@ func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() {
|
||||
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID")
|
||||
}
|
||||
|
||||
func (suite *BalanceCheckerTestSuite) TestHasUnbalancedCollectionFlag() {
|
||||
ctx := context.Background()
|
||||
|
||||
// Set up nodes
|
||||
nodeID1, nodeID2 := int64(1), int64(2)
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: nodeID1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: nodeID2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
}))
|
||||
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1)
|
||||
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2)
|
||||
|
||||
// Create collection
|
||||
cid1, replicaID1 := int64(1), int64(101)
|
||||
collection1 := utils.CreateTestCollection(cid1, int32(replicaID1))
|
||||
collection1.Status = querypb.LoadStatus_Loaded
|
||||
replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2})
|
||||
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1)
|
||||
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
||||
|
||||
// Mock the target manager
|
||||
mockTargetManager := meta.NewMockTargetManager(suite.T())
|
||||
suite.checker.targetMgr = mockTargetManager
|
||||
mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe()
|
||||
|
||||
// 1. Test normal balance with auto balance disabled
|
||||
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false")
|
||||
|
||||
// The collections set should be initially empty
|
||||
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
||||
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
|
||||
|
||||
// Get replicas - should return nil and keep the set empty
|
||||
replicas := suite.checker.getReplicaForNormalBalance(ctx)
|
||||
suite.Empty(replicas)
|
||||
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
|
||||
"normalBalanceCollectionsCurrentRound should remain empty when auto balance is disabled")
|
||||
|
||||
// 2. Test normal balance when targetMgr.IsCurrentTargetReady returns false
|
||||
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
||||
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||
mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false).Maybe()
|
||||
|
||||
// The collections set should be initially empty
|
||||
suite.checker.normalBalanceCollectionsCurrentRound.Clear()
|
||||
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len())
|
||||
|
||||
// Get replicas - should return nil and keep the set empty because of not ready targets
|
||||
replicas = suite.checker.getReplicaForNormalBalance(ctx)
|
||||
suite.Empty(replicas)
|
||||
suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(),
|
||||
"normalBalanceCollectionsCurrentRound should remain empty when targets are not ready")
|
||||
|
||||
// 3. Test stopping balance when there are no RO nodes
|
||||
paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true")
|
||||
mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe()
|
||||
mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe()
|
||||
|
||||
// The collections set should be initially empty
|
||||
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
||||
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
||||
|
||||
// Get replicas - should return nil and keep the set empty because there are no RO nodes
|
||||
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
||||
suite.Empty(replicas)
|
||||
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len(),
|
||||
"stoppingBalanceCollectionsCurrentRound should remain empty when there are no RO nodes")
|
||||
|
||||
// 4. Test stopping balance with RO nodes
|
||||
// Add a RO node to the replica
|
||||
mr1 := replica1.CopyForWrite()
|
||||
mr1.AddRONode(nodeID1)
|
||||
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
|
||||
|
||||
// The collections set should be initially empty
|
||||
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
|
||||
suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
||||
|
||||
// Get replicas - should return the replica ID and add the collection to the set
|
||||
replicas = suite.checker.getReplicaForStoppingBalance(ctx)
|
||||
suite.Equal([]int64{replicaID1}, replicas)
|
||||
suite.Equal(1, suite.checker.stoppingBalanceCollectionsCurrentRound.Len())
|
||||
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(cid1),
|
||||
"stoppingBalanceCollectionsCurrentRound should contain the collection when it has RO nodes")
|
||||
}
|
||||
|
||||
func TestBalanceCheckerSuite(t *testing.T) {
|
||||
suite.Run(t, new(BalanceCheckerTestSuite))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user