diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index ea27820485..de398fad32 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -88,20 +88,20 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b } func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 { - ids := b.meta.GetAll(ctx) + hasUnbalancedCollection := false + defer func() { + if !hasUnbalancedCollection { + b.stoppingBalanceCollectionsCurrentRound.Clear() + log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+ + "collections in one round, clear collectionIDs for this round") + } + }() + ids := b.meta.GetAll(ctx) // Sort collections using the configured sort order ids = b.sortCollections(ctx, ids) if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { - hasUnbalancedCollection := false - defer func() { - if !hasUnbalancedCollection { - b.stoppingBalanceCollectionsCurrentRound.Clear() - log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+ - "collections in one round, clear collectionIDs for this round") - } - }() for _, cid := range ids { // if target and meta isn't ready, skip balance this collection if !b.readyToCheck(ctx, cid) { @@ -127,18 +127,30 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int } } + // finish current round for stopping balance if no unbalanced collection + hasUnbalancedCollection = false return nil } func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 { + hasUnbalancedCollection := false + defer func() { + if !hasUnbalancedCollection { + b.normalBalanceCollectionsCurrentRound.Clear() + log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+ + "collections in one round, clear collectionIDs for this round") + } + }() + // 1. no stopping balance and auto balance is disabled, return empty collections for balance // 2. when balancer isn't active, skip auto balance if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() { + // finish current round for normal balance if normal balance isn't triggered + hasUnbalancedCollection = false return nil } ids := b.meta.GetAll(ctx) - // all replicas belonging to loading collection will be skipped loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { collection := b.meta.GetCollection(ctx, cid) @@ -152,6 +164,8 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 return !b.targetMgr.IsCurrentTargetReady(ctx, cid) }) if len(notReadyCollections) > 0 { + // finish current round for normal balance if any collection isn't ready + hasUnbalancedCollection = false log.RatedInfo(10, "skip normal balance, cause collection not ready for balance", zap.Int64s("collectionIDs", notReadyCollections)) return nil } @@ -161,7 +175,6 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 // iterator one normal collection in one round normalReplicasToBalance := make([]int64, 0) - hasUnbalancedCollection := false for _, cid := range loadedCollections { if b.normalBalanceCollectionsCurrentRound.Contain(cid) { log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round", @@ -175,12 +188,6 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 } break } - - if !hasUnbalancedCollection { - b.normalBalanceCollectionsCurrentRound.Clear() - log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+ - "collections in one round, clear collectionIDs for this round") - } return normalReplicasToBalance } diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 088d97059e..729f9760a6 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -758,6 +758,98 @@ func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() { suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID") } +func (suite *BalanceCheckerTestSuite) TestHasUnbalancedCollectionFlag() { + ctx := context.Background() + + // Set up nodes + nodeID1, nodeID2 := int64(1), int64(2) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: nodeID2, + Address: "localhost", + Hostname: "localhost", + })) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID1) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, nodeID2) + + // Create collection + cid1, replicaID1 := int64(1), int64(101) + collection1 := utils.CreateTestCollection(cid1, int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(replicaID1, cid1, []int64{nodeID1, nodeID2}) + suite.checker.meta.CollectionManager.PutCollection(ctx, collection1) + suite.checker.meta.ReplicaManager.Put(ctx, replica1) + + // Mock the target manager + mockTargetManager := meta.NewMockTargetManager(suite.T()) + suite.checker.targetMgr = mockTargetManager + mockTargetManager.EXPECT().GetCollectionRowCount(mock.Anything, mock.Anything, mock.Anything).Return(int64(100)).Maybe() + + // 1. Test normal balance with auto balance disabled + paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false") + + // The collections set should be initially empty + suite.checker.normalBalanceCollectionsCurrentRound.Clear() + suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len()) + + // Get replicas - should return nil and keep the set empty + replicas := suite.checker.getReplicaForNormalBalance(ctx) + suite.Empty(replicas) + suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(), + "normalBalanceCollectionsCurrentRound should remain empty when auto balance is disabled") + + // 2. Test normal balance when targetMgr.IsCurrentTargetReady returns false + paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe() + mockTargetManager.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false).Maybe() + + // The collections set should be initially empty + suite.checker.normalBalanceCollectionsCurrentRound.Clear() + suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len()) + + // Get replicas - should return nil and keep the set empty because of not ready targets + replicas = suite.checker.getReplicaForNormalBalance(ctx) + suite.Empty(replicas) + suite.Equal(0, suite.checker.normalBalanceCollectionsCurrentRound.Len(), + "normalBalanceCollectionsCurrentRound should remain empty when targets are not ready") + + // 3. Test stopping balance when there are no RO nodes + paramtable.Get().Save(Params.QueryCoordCfg.EnableStoppingBalance.Key, "true") + mockTargetManager.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true).Maybe() + mockTargetManager.EXPECT().IsCurrentTargetExist(mock.Anything, mock.Anything, mock.Anything).Return(true).Maybe() + + // The collections set should be initially empty + suite.checker.stoppingBalanceCollectionsCurrentRound.Clear() + suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len()) + + // Get replicas - should return nil and keep the set empty because there are no RO nodes + replicas = suite.checker.getReplicaForStoppingBalance(ctx) + suite.Empty(replicas) + suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len(), + "stoppingBalanceCollectionsCurrentRound should remain empty when there are no RO nodes") + + // 4. Test stopping balance with RO nodes + // Add a RO node to the replica + mr1 := replica1.CopyForWrite() + mr1.AddRONode(nodeID1) + suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica()) + + // The collections set should be initially empty + suite.checker.stoppingBalanceCollectionsCurrentRound.Clear() + suite.Equal(0, suite.checker.stoppingBalanceCollectionsCurrentRound.Len()) + + // Get replicas - should return the replica ID and add the collection to the set + replicas = suite.checker.getReplicaForStoppingBalance(ctx) + suite.Equal([]int64{replicaID1}, replicas) + suite.Equal(1, suite.checker.stoppingBalanceCollectionsCurrentRound.Len()) + suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(cid1), + "stoppingBalanceCollectionsCurrentRound should contain the collection when it has RO nodes") +} + func TestBalanceCheckerSuite(t *testing.T) { suite.Run(t, new(BalanceCheckerTestSuite)) }