diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3eb6a7d339..f45d359c70 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -517,6 +517,7 @@ dataCoord: balanceInterval: 360 # The interval with which the channel manager check dml channel balance status checkInterval: 1 # The interval in seconds with which the channel manager advances channel states notifyChannelOperationTimeout: 5 # Timeout notifing channel operations (in seconds). + maxConcurrentChannelTaskNumPerDN: 32 # The maximum concurrency for each DataNode executing channel tasks (watch, release). segment: maxSize: 1024 # The maximum size of a segment, unit: MB. datacoord.segment.maxSize and datacoord.segment.sealProportion together determine if a segment can be sealed. diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 61ca798485..4ded570162 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -454,7 +455,18 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) { updatedStandbys := false updatedStandbys = m.advanceStandbys(ctx, standbys) updatedToCheckes := m.advanceToChecks(ctx, toChecks) - updatedToNotifies := m.advanceToNotifies(ctx, toNotifies) + + var ( + updatedToNotifies bool + maxNum = len(m.store.GetNodes()) * paramtable.Get().DataCoordCfg.MaxConcurrentChannelTaskNumPerDN.GetAsInt() + executingNum = len(toChecks) + toNotifyNum = maxNum - executingNum + ) + + if toNotifyNum > 0 { + toNotifies = lo.Slice(toNotifies, 0, toNotifyNum) + updatedToNotifies = m.advanceToNotifies(ctx, toNotifies) + } if updatedStandbys || updatedToCheckes || updatedToNotifies { m.lastActiveTimestamp = time.Now() diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9f3acdcbd8..2ddd01d3ca 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3193,12 +3193,13 @@ user-task-polling: // --- datacoord --- type dataCoordConfig struct { // --- CHANNEL --- - WatchTimeoutInterval ParamItem `refreshable:"false"` - LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"` - ChannelBalanceSilentDuration ParamItem `refreshable:"true"` - ChannelBalanceInterval ParamItem `refreshable:"true"` - ChannelCheckInterval ParamItem `refreshable:"true"` - ChannelOperationRPCTimeout ParamItem `refreshable:"true"` + WatchTimeoutInterval ParamItem `refreshable:"false"` + LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"` + ChannelBalanceSilentDuration ParamItem `refreshable:"true"` + ChannelBalanceInterval ParamItem `refreshable:"true"` + ChannelCheckInterval ParamItem `refreshable:"true"` + ChannelOperationRPCTimeout ParamItem `refreshable:"true"` + MaxConcurrentChannelTaskNumPerDN ParamItem `refreshable:"true"` // --- SEGMENTS --- SegmentMaxSize ParamItem `refreshable:"false"` @@ -3368,6 +3369,15 @@ func (p *dataCoordConfig) init(base *BaseTable) { } p.ChannelOperationRPCTimeout.Init(base.mgr) + p.MaxConcurrentChannelTaskNumPerDN = ParamItem{ + Key: "dataCoord.channel.maxConcurrentChannelTaskNumPerDN", + Version: "2.5", + DefaultValue: "32", + Doc: "The maximum concurrency for each DataNode executing channel tasks (watch, release).", + Export: true, + } + p.MaxConcurrentChannelTaskNumPerDN.Init(base.mgr) + p.SegmentMaxSize = ParamItem{ Key: "dataCoord.segment.maxSize", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 0136e48136..3670828e2e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -515,6 +515,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) params.Save("datacoord.scheduler.taskSlowThreshold", "1000") assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second)) + assert.Equal(t, 32, Params.MaxConcurrentChannelTaskNumPerDN.GetAsInt()) }) t.Run("test dataNodeConfig", func(t *testing.T) {