mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: Fix deactivate balance checker also stops stopping balance (#44834)
issue: #43858 Fix the issue introduced in PR #43992 where deactivating the balance checker incorrectly stops stopping balance operations. Changes: - Move IsActive() check after stopping balance logic - Only skip normal balance when checker is inactive - Allow stopping balance to proceed regardless of checker state This ensures stopping balance can execute even when the balance checker is deactivated. --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
8bf7d6ae72
commit
38833b0e1d
@ -475,11 +475,6 @@ func (b *BalanceChecker) submitTasks(segmentTasks, channelTasks []task.Task) {
|
||||
// The method tracks execution time and logs warnings for slow operations
|
||||
// to help identify performance bottlenecks in large clusters.
|
||||
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
||||
// Skip balance operations if the checker is not active
|
||||
if !b.IsActive() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Performance monitoring: track execution time
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@ -510,6 +505,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
||||
}
|
||||
}
|
||||
|
||||
// Only Skip normal balance operations if the checker is not active
|
||||
if !b.IsActive() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Phase 2: Process normal balance if no stopping balance was needed
|
||||
// This handles regular load balancing operations for cluster optimization
|
||||
if paramtable.Get().QueryCoordCfg.AutoBalance.GetAsBool() {
|
||||
|
||||
@ -543,21 +543,107 @@ func TestBalanceChecker_SubmitTasks_EmptyTasks(t *testing.T) {
|
||||
// =============================================================================
|
||||
|
||||
func TestBalanceChecker_Check_InactiveChecker(t *testing.T) {
|
||||
t.Run("StoppingBalanceRunsWhenInactive", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Mock IsActive to return false
|
||||
// Mock IsActive to return false - checker is inactive
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
|
||||
// Mock paramtable to enable stopping balance
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// First call returns true for EnableStoppingBalance
|
||||
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
|
||||
defer mockGetAsBool.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
config := balanceConfig{
|
||||
segmentBatchSize: 5,
|
||||
channelBatchSize: 5,
|
||||
}
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Track whether processBalanceQueue was called for stopping balance
|
||||
stoppingBalanceCalled := false
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
stoppingBalanceCalled = true
|
||||
return 1, 0 // Return some tasks generated
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
|
||||
// Verify stopping balance was executed even though checker is inactive
|
||||
assert.True(t, stoppingBalanceCalled, "Stopping balance should run even when checker is inactive")
|
||||
assert.Nil(t, result)
|
||||
assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks")
|
||||
})
|
||||
|
||||
t.Run("NormalBalanceSkippedWhenInactive", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Mock IsActive to return false - checker is inactive
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(false).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
|
||||
// Mock paramtable
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// First call returns false for EnableStoppingBalance, so we skip to normal balance check
|
||||
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build()
|
||||
defer mockGetAsBool.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
config := balanceConfig{
|
||||
segmentBatchSize: 5,
|
||||
channelBatchSize: 5,
|
||||
}
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Track whether processBalanceQueue was called
|
||||
processQueueCalled := false
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
processQueueCalled = true
|
||||
return 0, 0
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
|
||||
// Verify normal balance was NOT executed because checker is inactive
|
||||
// processBalanceQueue should not be called at all since stopping balance is disabled
|
||||
// and IsActive check blocks normal balance
|
||||
assert.False(t, processQueueCalled, "Normal balance should not run when checker is inactive")
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBalanceChecker_Check_StoppingBalanceEnabled(t *testing.T) {
|
||||
t.Run("StoppingBalanceGeneratesTasksAndClearsNormalQueue", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Pre-populate normal balance queue to verify it gets cleared
|
||||
checker.normalBalanceQueue = createMockPriorityQueue()
|
||||
checker.normalBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount"))
|
||||
|
||||
// Mock IsActive to return true
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
@ -566,7 +652,7 @@ func TestBalanceChecker_Check_StoppingBalanceEnabled(t *testing.T) {
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
mockStoppingBalanceEnabled := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsBool")).Return(true).Build()
|
||||
mockStoppingBalanceEnabled := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
|
||||
defer mockStoppingBalanceEnabled.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
@ -577,24 +663,93 @@ func TestBalanceChecker_Check_StoppingBalanceEnabled(t *testing.T) {
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Mock processBalanceQueue to return tasks
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return(
|
||||
1, 0, // segment tasks, channel tasks
|
||||
).Build()
|
||||
// Track which balance type was called
|
||||
stoppingBalanceCalled := false
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
// Verify this is stopping balance by checking the function pointers
|
||||
stoppingBalanceCalled = true
|
||||
return 1, 0 // Generate stopping balance tasks
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
assert.Nil(t, result) // Always returns nil as tasks are submitted directly
|
||||
assert.Nil(t, checker.normalBalanceQueue)
|
||||
|
||||
// Verify stopping balance was executed
|
||||
assert.True(t, stoppingBalanceCalled, "Stopping balance should have been called")
|
||||
assert.Nil(t, result, "Check should always return nil")
|
||||
assert.Nil(t, checker.normalBalanceQueue, "Normal balance queue should be cleared when stopping balance generates tasks")
|
||||
})
|
||||
|
||||
t.Run("StoppingBalanceNoTasksAllowsNormalBalance", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
checker.autoBalanceTs = time.Time{} // Allow normal balance
|
||||
|
||||
// Mock IsActive to return true
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
|
||||
// Mock paramtable - stopping balance enabled, auto balance enabled
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// Return true for both EnableStoppingBalance and AutoBalance
|
||||
mockGetAsBool := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(true).Build()
|
||||
defer mockGetAsBool.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
config := balanceConfig{
|
||||
segmentBatchSize: 5,
|
||||
channelBatchSize: 5,
|
||||
autoBalanceInterval: 1 * time.Second,
|
||||
}
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Track how many times processBalanceQueue is called
|
||||
callCount := 0
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
callCount++
|
||||
if callCount == 1 {
|
||||
// First call: stopping balance generates no tasks
|
||||
return 0, 0
|
||||
}
|
||||
// Second call: normal balance generates tasks
|
||||
return 0, 1
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
|
||||
// Verify both stopping and normal balance were attempted
|
||||
assert.Equal(t, 2, callCount, "Both stopping balance and normal balance should be called")
|
||||
assert.Nil(t, result)
|
||||
assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks")
|
||||
})
|
||||
}
|
||||
|
||||
func TestBalanceChecker_Check_NormalBalanceEnabled(t *testing.T) {
|
||||
t.Run("NormalBalanceGeneratesTasks", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Set autoBalanceTs to allow normal balance
|
||||
checker.autoBalanceTs = time.Time{}
|
||||
|
||||
// Pre-populate stopping balance queue to verify it gets cleared
|
||||
checker.stoppingBalanceQueue = createMockPriorityQueue()
|
||||
checker.stoppingBalanceQueue.Push(newCollectionBalanceItem(1, 100, "byrowcount"))
|
||||
|
||||
// Mock IsActive to return true
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
@ -604,7 +759,7 @@ func TestBalanceChecker_Check_NormalBalanceEnabled(t *testing.T) {
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// return false for stopping balance enabled, true for auto balance enabled
|
||||
mockParams := mockey.Mock(mockey.GetMethod(¶mtable.ParamItem{}, "GetAsBool")).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
|
||||
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
|
||||
defer mockParams.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
@ -616,14 +771,124 @@ func TestBalanceChecker_Check_NormalBalanceEnabled(t *testing.T) {
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Mock processBalanceQueue to return tasks
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).Return(
|
||||
0, 1, // segment tasks, channel tasks
|
||||
).Build()
|
||||
// Track normal balance call and timestamp update
|
||||
normalBalanceCalled := false
|
||||
originalTs := checker.autoBalanceTs
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
normalBalanceCalled = true
|
||||
return 0, 1 // Generate normal balance tasks
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
assert.Nil(t, result) // Always returns nil as tasks are submitted directly
|
||||
|
||||
// Verify normal balance was executed
|
||||
assert.True(t, normalBalanceCalled, "Normal balance should have been called")
|
||||
assert.Nil(t, result, "Check should always return nil")
|
||||
assert.Nil(t, checker.stoppingBalanceQueue, "Stopping balance queue should be cleared when normal balance generates tasks")
|
||||
assert.True(t, checker.autoBalanceTs.After(originalTs), "autoBalanceTs should be updated when tasks are generated")
|
||||
})
|
||||
|
||||
t.Run("NormalBalanceRespects IntervalThrottle", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Set autoBalanceTs to recent time to trigger throttling
|
||||
checker.autoBalanceTs = time.Now()
|
||||
|
||||
// Mock IsActive to return true
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
|
||||
// Mock paramtable - stopping balance disabled, auto balance enabled
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// return false for stopping balance enabled, true for auto balance enabled
|
||||
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(mockey.Sequence(false).Times(1).Then(true)).Build()
|
||||
defer mockParams.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig with a large interval
|
||||
config := balanceConfig{
|
||||
segmentBatchSize: 5,
|
||||
channelBatchSize: 5,
|
||||
autoBalanceInterval: 10 * time.Second, // Long interval
|
||||
}
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Track whether processBalanceQueue was called
|
||||
normalBalanceCalled := false
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
normalBalanceCalled = true
|
||||
return 0, 1
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
|
||||
// Verify normal balance was NOT executed due to interval throttle
|
||||
assert.False(t, normalBalanceCalled, "Normal balance should respect autoBalanceInterval throttle")
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("NormalBalanceSkippedWhenAutoBalanceDisabled", func(t *testing.T) {
|
||||
checker := createTestBalanceChecker()
|
||||
ctx := context.Background()
|
||||
|
||||
// Set autoBalanceTs to allow normal balance
|
||||
checker.autoBalanceTs = time.Time{}
|
||||
|
||||
// Mock IsActive to return true
|
||||
mockIsActive := mockey.Mock((*checkerActivation).IsActive).Return(true).Build()
|
||||
defer mockIsActive.UnPatch()
|
||||
|
||||
// Mock paramtable - stopping balance disabled, auto balance also disabled
|
||||
mockParamGet := mockey.Mock(paramtable.Get).Return(¶mtable.ComponentParam{}).Build()
|
||||
defer mockParamGet.UnPatch()
|
||||
|
||||
// return false for both stopping balance and auto balance
|
||||
mockParams := mockey.Mock((*paramtable.ParamItem).GetAsBool).Return(false).Build()
|
||||
defer mockParams.UnPatch()
|
||||
|
||||
// Mock loadBalanceConfig
|
||||
config := balanceConfig{
|
||||
segmentBatchSize: 5,
|
||||
channelBatchSize: 5,
|
||||
autoBalanceInterval: 1 * time.Second,
|
||||
}
|
||||
mockLoadConfig := mockey.Mock((*BalanceChecker).loadBalanceConfig).Return(config).Build()
|
||||
defer mockLoadConfig.UnPatch()
|
||||
|
||||
// Track whether processBalanceQueue was called
|
||||
normalBalanceCalled := false
|
||||
mockProcessQueue := mockey.Mock((*BalanceChecker).processBalanceQueue).To(
|
||||
func(ctx context.Context,
|
||||
getReplicasFunc func(context.Context, int64) []int64,
|
||||
constructQueueFunc func(context.Context) *balance.PriorityQueue,
|
||||
getQueueFunc func() *balance.PriorityQueue, config balanceConfig,
|
||||
) (int, int) {
|
||||
normalBalanceCalled = true
|
||||
return 0, 1
|
||||
}).Build()
|
||||
defer mockProcessQueue.UnPatch()
|
||||
|
||||
result := checker.Check(ctx)
|
||||
|
||||
// Verify normal balance was NOT executed because auto balance is disabled
|
||||
assert.False(t, normalBalanceCalled, "Normal balance should be skipped when AutoBalance is disabled")
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user