enhance: smaller backoff configuration for wal balancer to make faster recovery (#42869)

issue: #42835

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-06-23 10:32:40 +08:00 committed by GitHub
parent b902960057
commit a081906fb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 25 additions and 11 deletions

View File

@ -1211,13 +1211,16 @@ streaming:
# The interval of balance task trigger at background, 1 min by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
triggerInterval: 1m
# The initial interval of balance task trigger backoff, 50 ms by default.
# The initial interval of balance task trigger backoff, 10 ms by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffInitialInterval: 10ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
# The timeout of wal balancer operation, 10s by default.
# The max interval of balance task trigger backoff, 5s by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffMaxInterval: 5s
# The timeout of wal balancer operation, 30s by default.
# If the operation exceeds this timeout, it will be canceled.
operationTimeout: 10s
operationTimeout: 30s
balancePolicy:
name: vchannelFair # The multiplier of balance task trigger backoff, 2 by default
vchannelFair:

View File

@ -458,7 +458,7 @@ func (f *backoffConfigFetcher) BackoffConfig() typeutil.BackoffConfig {
return typeutil.BackoffConfig{
InitialInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse(),
Multiplier: paramtable.Get().StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat(),
MaxInterval: paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse(),
MaxInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffMaxInterval.GetAsDurationByParse(),
}
}

View File

@ -5634,6 +5634,7 @@ type streamingConfig struct {
WALBalancerTriggerInterval ParamItem `refreshable:"true"`
WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"`
WALBalancerBackoffMultiplier ParamItem `refreshable:"true"`
WALBalancerBackoffMaxInterval ParamItem `refreshable:"true"`
WALBalancerOperationTimeout ParamItem `refreshable:"true"`
// balancer Policy
@ -5687,9 +5688,9 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
p.WALBalancerBackoffInitialInterval = ParamItem{
Key: "streaming.walBalancer.backoffInitialInterval",
Version: "2.6.0",
Doc: `The initial interval of balance task trigger backoff, 50 ms by default.
Doc: `The initial interval of balance task trigger backoff, 10 ms by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "50ms",
DefaultValue: "10ms",
Export: true,
}
p.WALBalancerBackoffInitialInterval.Init(base.mgr)
@ -5701,12 +5702,21 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
Export: true,
}
p.WALBalancerBackoffMultiplier.Init(base.mgr)
p.WALBalancerBackoffMaxInterval = ParamItem{
Key: "streaming.walBalancer.backoffMaxInterval",
Version: "2.6.0",
Doc: `The max interval of balance task trigger backoff, 5s by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "5s",
Export: true,
}
p.WALBalancerBackoffMaxInterval.Init(base.mgr)
p.WALBalancerOperationTimeout = ParamItem{
Key: "streaming.walBalancer.operationTimeout",
Version: "2.6.0",
Doc: `The timeout of wal balancer operation, 10s by default.
Doc: `The timeout of wal balancer operation, 30s by default.
If the operation exceeds this timeout, it will be canceled.`,
DefaultValue: "10s",
DefaultValue: "30s",
Export: true,
}
p.WALBalancerOperationTimeout.Init(base.mgr)

View File

@ -631,7 +631,8 @@ func TestComponentParam(t *testing.T) {
t.Run("test streamingConfig", func(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 10*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 5*time.Second, params.StreamingCfg.WALBalancerBackoffMaxInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, "vchannelFair", params.StreamingCfg.WALBalancerPolicyName.GetValue())
assert.Equal(t, 0.4, params.StreamingCfg.WALBalancerPolicyVChannelFairPChannelWeight.GetAsFloat())
@ -639,7 +640,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairAntiAffinityWeight.GetAsFloat())
assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceTolerance.GetAsFloat())
assert.Equal(t, 3, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceMaxStep.GetAsInt())
assert.Equal(t, 10*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse())
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse())
assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())