diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 72dbc980d9..1f48bb8388 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -475,11 +475,6 @@ func (b *BalanceChecker) submitTasks(segmentTasks, channelTasks []task.Task) { // The method tracks execution time and logs warnings for slow operations // to help identify performance bottlenecks in large clusters. func (b *BalanceChecker) Check(ctx context.Context) []task.Task { - // Skip balance operations if the checker is not active - if !b.IsActive() { - return nil - } - // Performance monitoring: track execution time start := time.Now() defer func() { @@ -510,6 +505,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task { } } + // Only Skip normal balance operations if the checker is not active + if !b.IsActive() { + return nil + } + // Phase 2: Process normal balance if no stopping balance was needed // This handles regular load balancing operations for cluster optimization if paramtable.Get().QueryCoordCfg.AutoBalance.GetAsBool() { diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index cdda585f08..e82bab30aa 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -543,87 +543,352 @@ func TestBalanceChecker_SubmitTasks_EmptyTasks(t *testing.T) { // ============================================================================= func TestBalanceChecker_Check_InactiveChecker(t *testing.T) { - checker := createTestBalanceChecker() - ctx := context.Background() + t.Run("StoppingBalanceRunsWhenInactive", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() - // Mock IsActive to return false - mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build() - defer mockIsActive.UnPatch() + // Mock IsActive to return false - checker is inactive + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build() + defer mockIsActive.UnPatch() - result := checker.Check(ctx) - assert.Nil(t, result) + // Mock paramtable to enable stopping balance + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() + + // First call returns true for EnableStoppingBalance + mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build() + defer mockGetAsBool.UnPatch() + + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() + + // Track whether processBalanceQueue was called for stopping balance + stoppingBalanceCalled := false + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + stoppingBalanceCalled = true + return 1, 0 // Return some tasks generated + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify stopping balance was executed even though checker is inactive + assert.True(t, stoppingBalanceCalled, "Stopping balance should run even when checker is inactive") + assert.Nil(t, result) + assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks") + }) + + t.Run("NormalBalanceSkippedWhenInactive", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() + + // Mock IsActive to return false - checker is inactive + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build() + defer mockIsActive.UnPatch() + + // Mock paramtable + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() + + // First call returns false for EnableStoppingBalance, so we skip to normal balance check + mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build() + defer mockGetAsBool.UnPatch() + + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() + + // Track whether processBalanceQueue was called + processQueueCalled := false + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + processQueueCalled = true + return 0, 0 + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify normal balance was NOT executed because checker is inactive + // processBalanceQueue should not be called at all since stopping balance is disabled + // and IsActive check blocks normal balance + assert.False(t, processQueueCalled, "Normal balance should not run when checker is inactive") + assert.Nil(t, result) + }) } func TestBalanceChecker_Check_StoppingBalanceEnabled(t *testing.T) { - checker := createTestBalanceChecker() - ctx := context.Background() + t.Run("StoppingBalanceGeneratesTasksAndClearsNormalQueue", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() - // Mock IsActive to return true - mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() - defer mockIsActive.UnPatch() + // Pre-populate normal balance queue to verify it gets cleared + checker.normalBalanceQueue = createMockPriorityQueue() + checker.normalBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount")) - // Mock paramtable for enabling stopping balance - mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() - defer mockParamGet.UnPatch() + // Mock IsActive to return true + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() + defer mockIsActive.UnPatch() - mockStoppingBalanceEnabled := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsBool")).Return(true).Build() - defer mockStoppingBalanceEnabled.UnPatch() + // Mock paramtable for enabling stopping balance + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() - // Mock loadBalanceConfig - config := balanceConfig{ - segmentBatchSize: 5, - channelBatchSize: 5, - } - mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() - defer mockLoadConfig.UnPatch() + mockStoppingBalanceEnabled := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build() + defer mockStoppingBalanceEnabled.UnPatch() - // Mock processBalanceQueue to return tasks - mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return( - 1, 0, // segment tasks, channel tasks - ).Build() - defer mockProcessQueue.UnPatch() + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() - result := checker.Check(ctx) - assert.Nil(t, result) // Always returns nil as tasks are submitted directly - assert.Nil(t, checker.normalBalanceQueue) + // Track which balance type was called + stoppingBalanceCalled := false + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + // Verify this is stopping balance by checking the function pointers + stoppingBalanceCalled = true + return 1, 0 // Generate stopping balance tasks + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify stopping balance was executed + assert.True(t, stoppingBalanceCalled, "Stopping balance should have been called") + assert.Nil(t, result, "Check should always return nil") + assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks") + }) + + t.Run("StoppingBalanceNoTasksAllowsNormalBalance", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() + checker.autoBalanceTs = time.Time{} // Allow normal balance + + // Mock IsActive to return true + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() + defer mockIsActive.UnPatch() + + // Mock paramtable - stopping balance enabled, auto balance enabled + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() + + // Return true for both EnableStoppingBalance and AutoBalance + mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build() + defer mockGetAsBool.UnPatch() + + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + autoBalanceInterval: 1 * time.Second, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() + + // Track how many times processBalanceQueue is called + callCount := 0 + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + callCount++ + if callCount == 1 { + // First call: stopping balance generates no tasks + return 0, 0 + } + // Second call: normal balance generates tasks + return 0, 1 + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify both stopping and normal balance were attempted + assert.Equal(t, 2, callCount, "Both stopping balance and normal balance should be called") + assert.Nil(t, result) + assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks") + }) } func TestBalanceChecker_Check_NormalBalanceEnabled(t *testing.T) { - checker := createTestBalanceChecker() - ctx := context.Background() + t.Run("NormalBalanceGeneratesTasks", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() - // Set autoBalanceTs to allow normal balance - checker.autoBalanceTs = time.Time{} + // Set autoBalanceTs to allow normal balance + checker.autoBalanceTs = time.Time{} - // Mock IsActive to return true - mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() - defer mockIsActive.UnPatch() + // Pre-populate stopping balance queue to verify it gets cleared + checker.stoppingBalanceQueue = createMockPriorityQueue() + checker.stoppingBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount")) - // Mock paramtable - stopping balance disabled, auto balance enabled - mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() - defer mockParamGet.UnPatch() + // Mock IsActive to return true + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() + defer mockIsActive.UnPatch() - // return false for stopping balance enabled, true for auto balance enabled - mockParams := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsBool")).Return(mockey.Sequence(false).Times(1).Then(true)).Build() - defer mockParams.UnPatch() + // Mock paramtable - stopping balance disabled, auto balance enabled + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() - // Mock loadBalanceConfig - config := balanceConfig{ - segmentBatchSize: 5, - channelBatchSize: 5, - autoBalanceInterval: 1 * time.Second, - } - mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() - defer mockLoadConfig.UnPatch() + // return false for stopping balance enabled, true for auto balance enabled + mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build() + defer mockParams.UnPatch() - // Mock processBalanceQueue to return tasks - mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return( - 0, 1, // segment tasks, channel tasks - ).Build() - defer mockProcessQueue.UnPatch() + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + autoBalanceInterval: 1 * time.Second, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() - result := checker.Check(ctx) - assert.Nil(t, result) // Always returns nil as tasks are submitted directly + // Track normal balance call and timestamp update + normalBalanceCalled := false + originalTs := checker.autoBalanceTs + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + normalBalanceCalled = true + return 0, 1 // Generate normal balance tasks + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify normal balance was executed + assert.True(t, normalBalanceCalled, "Normal balance should have been called") + assert.Nil(t, result, "Check should always return nil") + assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks") + assert.True(t, checker.autoBalanceTs.After(originalTs), "autoBalanceTs should be updated when tasks are generated") + }) + + t.Run("NormalBalanceRespects IntervalThrottle", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() + + // Set autoBalanceTs to recent time to trigger throttling + checker.autoBalanceTs = time.Now() + + // Mock IsActive to return true + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() + defer mockIsActive.UnPatch() + + // Mock paramtable - stopping balance disabled, auto balance enabled + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() + + // return false for stopping balance enabled, true for auto balance enabled + mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build() + defer mockParams.UnPatch() + + // Mock loadBalanceConfig with a large interval + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + autoBalanceInterval: 10 * time.Second, // Long interval + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() + + // Track whether processBalanceQueue was called + normalBalanceCalled := false + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + normalBalanceCalled = true + return 0, 1 + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify normal balance was NOT executed due to interval throttle + assert.False(t, normalBalanceCalled, "Normal balance should respect autoBalanceInterval throttle") + assert.Nil(t, result) + }) + + t.Run("NormalBalanceSkippedWhenAutoBalanceDisabled", func(t *testing.T) { + checker := createTestBalanceChecker() + ctx := context.Background() + + // Set autoBalanceTs to allow normal balance + checker.autoBalanceTs = time.Time{} + + // Mock IsActive to return true + mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build() + defer mockIsActive.UnPatch() + + // Mock paramtable - stopping balance disabled, auto balance also disabled + mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build() + defer mockParamGet.UnPatch() + + // return false for both stopping balance and auto balance + mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build() + defer mockParams.UnPatch() + + // Mock loadBalanceConfig + config := balanceConfig{ + segmentBatchSize: 5, + channelBatchSize: 5, + autoBalanceInterval: 1 * time.Second, + } + mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build() + defer mockLoadConfig.UnPatch() + + // Track whether processBalanceQueue was called + normalBalanceCalled := false + mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To( + func(ctx context.Context, + getReplicasFunc func(context.Context, int64) []int64, + constructQueueFunc func(context.Context) *balance.PriorityQueue, + getQueueFunc func() *balance.PriorityQueue, config balanceConfig, + ) (int, int) { + normalBalanceCalled = true + return 0, 1 + }).Build() + defer mockProcessQueue.UnPatch() + + result := checker.Check(ctx) + + // Verify normal balance was NOT executed because auto balance is disabled + assert.False(t, normalBalanceCalled, "Normal balance should be skipped when AutoBalance is disabled") + assert.Nil(t, result) + }) } // =============================================================================