mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Limit the concurrency of channel tasks (#37740)
Limit the maximum concurrency of channel tasks for each DataNode to prevent excessive subscriptions from causing DataNode OOM. issue: https://github.com/milvus-io/milvus/issues/37665 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
e9391acf80
commit
0fc0d1a888
@ -517,6 +517,7 @@ dataCoord:
|
||||
balanceInterval: 360 # The interval with which the channel manager check dml channel balance status
|
||||
checkInterval: 1 # The interval in seconds with which the channel manager advances channel states
|
||||
notifyChannelOperationTimeout: 5 # Timeout notifing channel operations (in seconds).
|
||||
maxConcurrentChannelTaskNumPerDN: 32 # The maximum concurrency for each DataNode executing channel tasks (watch, release).
|
||||
segment:
|
||||
maxSize: 1024 # The maximum size of a segment, unit: MB. datacoord.segment.maxSize and datacoord.segment.sealProportion together determine if a segment can be sealed.
|
||||
diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index
|
||||
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -454,7 +455,18 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
|
||||
updatedStandbys := false
|
||||
updatedStandbys = m.advanceStandbys(ctx, standbys)
|
||||
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
|
||||
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)
|
||||
|
||||
var (
|
||||
updatedToNotifies bool
|
||||
maxNum = len(m.store.GetNodes()) * paramtable.Get().DataCoordCfg.MaxConcurrentChannelTaskNumPerDN.GetAsInt()
|
||||
executingNum = len(toChecks)
|
||||
toNotifyNum = maxNum - executingNum
|
||||
)
|
||||
|
||||
if toNotifyNum > 0 {
|
||||
toNotifies = lo.Slice(toNotifies, 0, toNotifyNum)
|
||||
updatedToNotifies = m.advanceToNotifies(ctx, toNotifies)
|
||||
}
|
||||
|
||||
if updatedStandbys || updatedToCheckes || updatedToNotifies {
|
||||
m.lastActiveTimestamp = time.Now()
|
||||
|
||||
@ -3193,12 +3193,13 @@ user-task-polling:
|
||||
// --- datacoord ---
|
||||
type dataCoordConfig struct {
|
||||
// --- CHANNEL ---
|
||||
WatchTimeoutInterval ParamItem `refreshable:"false"`
|
||||
LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"`
|
||||
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`
|
||||
ChannelBalanceInterval ParamItem `refreshable:"true"`
|
||||
ChannelCheckInterval ParamItem `refreshable:"true"`
|
||||
ChannelOperationRPCTimeout ParamItem `refreshable:"true"`
|
||||
WatchTimeoutInterval ParamItem `refreshable:"false"`
|
||||
LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"`
|
||||
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`
|
||||
ChannelBalanceInterval ParamItem `refreshable:"true"`
|
||||
ChannelCheckInterval ParamItem `refreshable:"true"`
|
||||
ChannelOperationRPCTimeout ParamItem `refreshable:"true"`
|
||||
MaxConcurrentChannelTaskNumPerDN ParamItem `refreshable:"true"`
|
||||
|
||||
// --- SEGMENTS ---
|
||||
SegmentMaxSize ParamItem `refreshable:"false"`
|
||||
@ -3368,6 +3369,15 @@ func (p *dataCoordConfig) init(base *BaseTable) {
|
||||
}
|
||||
p.ChannelOperationRPCTimeout.Init(base.mgr)
|
||||
|
||||
p.MaxConcurrentChannelTaskNumPerDN = ParamItem{
|
||||
Key: "dataCoord.channel.maxConcurrentChannelTaskNumPerDN",
|
||||
Version: "2.5",
|
||||
DefaultValue: "32",
|
||||
Doc: "The maximum concurrency for each DataNode executing channel tasks (watch, release).",
|
||||
Export: true,
|
||||
}
|
||||
p.MaxConcurrentChannelTaskNumPerDN.Init(base.mgr)
|
||||
|
||||
p.SegmentMaxSize = ParamItem{
|
||||
Key: "dataCoord.segment.maxSize",
|
||||
Version: "2.0.0",
|
||||
|
||||
@ -515,6 +515,7 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
|
||||
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
|
||||
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 32, Params.MaxConcurrentChannelTaskNumPerDN.GetAsInt())
|
||||
})
|
||||
|
||||
t.Run("test dataNodeConfig", func(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user