diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4b31fd9bc4..716815f604 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1211,13 +1211,16 @@ streaming: # The interval of balance task trigger at background, 1 min by default. # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration triggerInterval: 1m - # The initial interval of balance task trigger backoff, 50 ms by default. + # The initial interval of balance task trigger backoff, 10 ms by default. # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration - backoffInitialInterval: 50ms + backoffInitialInterval: 10ms backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default - # The timeout of wal balancer operation, 10s by default. + # The max interval of balance task trigger backoff, 5s by default. + # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration + backoffMaxInterval: 5s + # The timeout of wal balancer operation, 30s by default. # If the operation exceeds this timeout, it will be canceled. - operationTimeout: 10s + operationTimeout: 30s balancePolicy: name: vchannelFair # The multiplier of balance task trigger backoff, 2 by default vchannelFair: diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index b9a5c65edd..ee8c2b2b35 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -458,7 +458,7 @@ func (f *backoffConfigFetcher) BackoffConfig() typeutil.BackoffConfig { return typeutil.BackoffConfig{ InitialInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse(), Multiplier: paramtable.Get().StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat(), - MaxInterval: paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse(), + MaxInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffMaxInterval.GetAsDurationByParse(), } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3915672c4f..7a919aa2cb 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5634,6 +5634,7 @@ type streamingConfig struct { WALBalancerTriggerInterval ParamItem `refreshable:"true"` WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"` WALBalancerBackoffMultiplier ParamItem `refreshable:"true"` + WALBalancerBackoffMaxInterval ParamItem `refreshable:"true"` WALBalancerOperationTimeout ParamItem `refreshable:"true"` // balancer Policy @@ -5687,9 +5688,9 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura p.WALBalancerBackoffInitialInterval = ParamItem{ Key: "streaming.walBalancer.backoffInitialInterval", Version: "2.6.0", - Doc: `The initial interval of balance task trigger backoff, 50 ms by default. + Doc: `The initial interval of balance task trigger backoff, 10 ms by default. It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`, - DefaultValue: "50ms", + DefaultValue: "10ms", Export: true, } p.WALBalancerBackoffInitialInterval.Init(base.mgr) @@ -5701,12 +5702,21 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura Export: true, } p.WALBalancerBackoffMultiplier.Init(base.mgr) + p.WALBalancerBackoffMaxInterval = ParamItem{ + Key: "streaming.walBalancer.backoffMaxInterval", + Version: "2.6.0", + Doc: `The max interval of balance task trigger backoff, 5s by default. +It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`, + DefaultValue: "5s", + Export: true, + } + p.WALBalancerBackoffMaxInterval.Init(base.mgr) p.WALBalancerOperationTimeout = ParamItem{ Key: "streaming.walBalancer.operationTimeout", Version: "2.6.0", - Doc: `The timeout of wal balancer operation, 10s by default. + Doc: `The timeout of wal balancer operation, 30s by default. If the operation exceeds this timeout, it will be canceled.`, - DefaultValue: "10s", + DefaultValue: "30s", Export: true, } p.WALBalancerOperationTimeout.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 1281172cd7..5445caa442 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -631,7 +631,8 @@ func TestComponentParam(t *testing.T) { t.Run("test streamingConfig", func(t *testing.T) { assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) - assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) + assert.Equal(t, 10*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) + assert.Equal(t, 5*time.Second, params.StreamingCfg.WALBalancerBackoffMaxInterval.GetAsDurationByParse()) assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) assert.Equal(t, "vchannelFair", params.StreamingCfg.WALBalancerPolicyName.GetValue()) assert.Equal(t, 0.4, params.StreamingCfg.WALBalancerPolicyVChannelFairPChannelWeight.GetAsFloat()) @@ -639,7 +640,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairAntiAffinityWeight.GetAsFloat()) assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceTolerance.GetAsFloat()) assert.Equal(t, 3, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceMaxStep.GetAsInt()) - assert.Equal(t, 10*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse()) + assert.Equal(t, 30*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse()) assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())