fix: [2.6] Fix deactivate balance checker also stops stopping balance (#44836)

issue: #43858
pr: #44834
Fix the issue introduced in PR #43992 where deactivating the balance
checker incorrectly stops stopping balance operations.

Changes:
- Move IsActive() check after stopping balance logic
- Only skip normal balance when checker is inactive
- Allow stopping balance to proceed regardless of checker state

This ensures stopping balance can execute even when the balance checker
is deactivated.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-10-20 18:48:03 +08:00 committed by GitHub
parent 26c3983d93
commit 613b6a716e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 330 additions and 65 deletions

View File

@ -475,11 +475,6 @@ func (b *BalanceChecker) submitTasks(segmentTasks, channelTasks []task.Task) {
// The method tracks execution time and logs warnings for slow operations
// to help identify performance bottlenecks in large clusters.
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
// Skip balance operations if the checker is not active
if !b.IsActive() {
return nil
}
// Performance monitoring: track execution time
start := time.Now()
defer func() {
@ -510,6 +505,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
}
}
// Only Skip normal balance operations if the checker is not active
if !b.IsActive() {
return nil
}
// Phase 2: Process normal balance if no stopping balance was needed
// This handles regular load balancing operations for cluster optimization
if paramtable.Get().QueryCoordCfg.AutoBalance.GetAsBool() {

View File

@ -543,87 +543,352 @@ func TestBalanceChecker_SubmitTasks_EmptyTasks(t *testing.T) {
// =============================================================================
func TestBalanceChecker_Check_InactiveChecker(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
t.Run("StoppingBalanceRunsWhenInactive", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Mock IsActive to return false
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build()
defer mockIsActive.UnPatch()
// Mock IsActive to return false - checker is inactive
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build()
defer mockIsActive.UnPatch()
result := checker.Check(ctx)
assert.Nil(t, result)
// Mock paramtable to enable stopping balance
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// First call returns true for EnableStoppingBalance
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
defer mockGetAsBool.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// Track whether processBalanceQueue was called for stopping balance
stoppingBalanceCalled := false
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
stoppingBalanceCalled = true
return 1, 0 // Return some tasks generated
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify stopping balance was executed even though checker is inactive
assert.True(t, stoppingBalanceCalled, "Stopping balance should run even when checker is inactive")
assert.Nil(t, result)
assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks")
})
t.Run("NormalBalanceSkippedWhenInactive", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Mock IsActive to return false - checker is inactive
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build()
defer mockIsActive.UnPatch()
// Mock paramtable
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// First call returns false for EnableStoppingBalance, so we skip to normal balance check
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build()
defer mockGetAsBool.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// Track whether processBalanceQueue was called
processQueueCalled := false
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
processQueueCalled = true
return 0, 0
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify normal balance was NOT executed because checker is inactive
// processBalanceQueue should not be called at all since stopping balance is disabled
// and IsActive check blocks normal balance
assert.False(t, processQueueCalled, "Normal balance should not run when checker is inactive")
assert.Nil(t, result)
})
}
func TestBalanceChecker_Check_StoppingBalanceEnabled(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
t.Run("StoppingBalanceGeneratesTasksAndClearsNormalQueue", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// Pre-populate normal balance queue to verify it gets cleared
checker.normalBalanceQueue = createMockPriorityQueue()
checker.normalBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount"))
// Mock paramtable for enabling stopping balance
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
mockStoppingBalanceEnabled := mockey.Mock(mockey.GetMethod(&paramtable.ParamItem{}, "GetAsBool")).Return(true).Build()
defer mockStoppingBalanceEnabled.UnPatch()
// Mock paramtable for enabling stopping balance
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
mockStoppingBalanceEnabled := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
defer mockStoppingBalanceEnabled.UnPatch()
// Mock processBalanceQueue to return tasks
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return(
1, 0, // segment tasks, channel tasks
).Build()
defer mockProcessQueue.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
result := checker.Check(ctx)
assert.Nil(t, result) // Always returns nil as tasks are submitted directly
assert.Nil(t, checker.normalBalanceQueue)
// Track which balance type was called
stoppingBalanceCalled := false
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
// Verify this is stopping balance by checking the function pointers
stoppingBalanceCalled = true
return 1, 0 // Generate stopping balance tasks
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify stopping balance was executed
assert.True(t, stoppingBalanceCalled, "Stopping balance should have been called")
assert.Nil(t, result, "Check should always return nil")
assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks")
})
t.Run("StoppingBalanceNoTasksAllowsNormalBalance", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
checker.autoBalanceTs = time.Time{} // Allow normal balance
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// Mock paramtable - stopping balance enabled, auto balance enabled
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// Return true for both EnableStoppingBalance and AutoBalance
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
defer mockGetAsBool.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
autoBalanceInterval: 1 * time.Second,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// Track how many times processBalanceQueue is called
callCount := 0
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
callCount++
if callCount == 1 {
// First call: stopping balance generates no tasks
return 0, 0
}
// Second call: normal balance generates tasks
return 0, 1
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify both stopping and normal balance were attempted
assert.Equal(t, 2, callCount, "Both stopping balance and normal balance should be called")
assert.Nil(t, result)
assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks")
})
}
func TestBalanceChecker_Check_NormalBalanceEnabled(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
t.Run("NormalBalanceGeneratesTasks", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Set autoBalanceTs to allow normal balance
checker.autoBalanceTs = time.Time{}
// Set autoBalanceTs to allow normal balance
checker.autoBalanceTs = time.Time{}
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// Pre-populate stopping balance queue to verify it gets cleared
checker.stoppingBalanceQueue = createMockPriorityQueue()
checker.stoppingBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount"))
// Mock paramtable - stopping balance disabled, auto balance enabled
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// return false for stopping balance enabled, true for auto balance enabled
mockParams := mockey.Mock(mockey.GetMethod(&paramtable.ParamItem{}, "GetAsBool")).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
defer mockParams.UnPatch()
// Mock paramtable - stopping balance disabled, auto balance enabled
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
autoBalanceInterval: 1 * time.Second,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// return false for stopping balance enabled, true for auto balance enabled
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
defer mockParams.UnPatch()
// Mock processBalanceQueue to return tasks
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return(
0, 1, // segment tasks, channel tasks
).Build()
defer mockProcessQueue.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
autoBalanceInterval: 1 * time.Second,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
result := checker.Check(ctx)
assert.Nil(t, result) // Always returns nil as tasks are submitted directly
// Track normal balance call and timestamp update
normalBalanceCalled := false
originalTs := checker.autoBalanceTs
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
normalBalanceCalled = true
return 0, 1 // Generate normal balance tasks
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify normal balance was executed
assert.True(t, normalBalanceCalled, "Normal balance should have been called")
assert.Nil(t, result, "Check should always return nil")
assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks")
assert.True(t, checker.autoBalanceTs.After(originalTs), "autoBalanceTs should be updated when tasks are generated")
})
t.Run("NormalBalanceRespects IntervalThrottle", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Set autoBalanceTs to recent time to trigger throttling
checker.autoBalanceTs = time.Now()
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// Mock paramtable - stopping balance disabled, auto balance enabled
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// return false for stopping balance enabled, true for auto balance enabled
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
defer mockParams.UnPatch()
// Mock loadBalanceConfig with a large interval
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
autoBalanceInterval: 10 * time.Second, // Long interval
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// Track whether processBalanceQueue was called
normalBalanceCalled := false
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
normalBalanceCalled = true
return 0, 1
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify normal balance was NOT executed due to interval throttle
assert.False(t, normalBalanceCalled, "Normal balance should respect autoBalanceInterval throttle")
assert.Nil(t, result)
})
t.Run("NormalBalanceSkippedWhenAutoBalanceDisabled", func(t *testing.T) {
checker := createTestBalanceChecker()
ctx := context.Background()
// Set autoBalanceTs to allow normal balance
checker.autoBalanceTs = time.Time{}
// Mock IsActive to return true
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
defer mockIsActive.UnPatch()
// Mock paramtable - stopping balance disabled, auto balance also disabled
mockParamGet := mockey.Mock(paramtable.Get).Return(&paramtable.ComponentParam{}).Build()
defer mockParamGet.UnPatch()
// return false for both stopping balance and auto balance
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build()
defer mockParams.UnPatch()
// Mock loadBalanceConfig
config := balanceConfig{
segmentBatchSize: 5,
channelBatchSize: 5,
autoBalanceInterval: 1 * time.Second,
}
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
defer mockLoadConfig.UnPatch()
// Track whether processBalanceQueue was called
normalBalanceCalled := false
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
func(ctx context.Context,
getReplicasFunc func(context.Context, int64) []int64,
constructQueueFunc func(context.Context) *balance.PriorityQueue,
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
) (int, int) {
normalBalanceCalled = true
return 0, 1
}).Build()
defer mockProcessQueue.UnPatch()
result := checker.Check(ctx)
// Verify normal balance was NOT executed because auto balance is disabled
assert.False(t, normalBalanceCalled, "Normal balance should be skipped when AutoBalance is disabled")
assert.Nil(t, result)
})
}
// =============================================================================