From 5eb6127fc58598a5ce4e1a9bc5dae2f3a77cf92b Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 30 Aug 2022 19:50:57 +0800 Subject: [PATCH] Set max read concurrency ratio to improve concurrent read performance (#18911) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- configs/milvus.yaml | 6 +++++- internal/util/paramtable/component_param.go | 11 ++++++++--- internal/util/paramtable/component_param_test.go | 4 ++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d76ecd9999..b2510a3f2c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -207,7 +207,11 @@ queryNode: scheduler: receiveChanSize: 10240 unsolvedQueueSize: 10240 - maxReadConcurrency: 0 # maximum concurrency of read task. if set to less or equal 0, it means no uppper limit. + # maxReadConcurrentRatio is the concurrency ratio of read task (search task and query task). + # Max read concurrency would be the value of `runtime.NumCPU * maxReadConcurrentRatio`. + # It defaults to 2.0, which means max read concurrency would be the value of runtime.NumCPU * 2. + # Max read concurrency must greater than or equal to 1, and less than or equal to runtime.NumCPU * 100. + maxReadConcurrentRatio: 2.0 # (0, 100] cpuRatio: 10.0 # ratio used to estimate read task cpu usage. grouping: diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 379f570e45..d56afa524f 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -13,6 +13,7 @@ package paramtable import ( "math" + "runtime" "strconv" "strings" "sync" @@ -870,9 +871,13 @@ func (p *queryNodeConfig) initCPURatio() { } func (p *queryNodeConfig) initMaxReadConcurrency() { - p.MaxReadConcurrency = p.Base.ParseInt32WithDefault("queryNode.scheduler.maxReadConcurrency", 0) - if p.MaxReadConcurrency <= 0 { - p.MaxReadConcurrency = math.MaxInt32 + readConcurrencyRatio := p.Base.ParseFloatWithDefault("queryNode.scheduler.maxReadConcurrentRatio", 2.0) + cpuNum := int32(runtime.GOMAXPROCS(0)) + p.MaxReadConcurrency = int32(float64(cpuNum) * readConcurrencyRatio) + if p.MaxReadConcurrency < 1 { + p.MaxReadConcurrency = 1 // MaxReadConcurrency must >= 1 + } else if p.MaxReadConcurrency > cpuNum*100 { + p.MaxReadConcurrency = cpuNum * 100 // MaxReadConcurrency must <= 100*cpuNum } } diff --git a/internal/util/paramtable/component_param_test.go b/internal/util/paramtable/component_param_test.go index 85f269f42c..213db544de 100644 --- a/internal/util/paramtable/component_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -12,7 +12,7 @@ package paramtable import ( - "math" + "runtime" "testing" "time" @@ -245,7 +245,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.GroupEnabled) assert.Equal(t, int32(10240), Params.MaxReceiveChanSize) assert.Equal(t, int32(10240), Params.MaxUnsolvedQueueSize) - assert.Equal(t, int32(math.MaxInt32), Params.MaxReadConcurrency) + assert.Equal(t, int32(runtime.GOMAXPROCS(0)*2), Params.MaxReadConcurrency) assert.Equal(t, int64(1000), Params.MaxGroupNQ) assert.Equal(t, 10.0, Params.TopKMergeRatio) assert.Equal(t, 10.0, Params.CPURatio)