From 52ac0718f059d4aa45c5908ec8507e6045b24e1f Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 20 Feb 2024 15:58:52 +0800 Subject: [PATCH] enhance: limit the max pool size to 16 (#30371) (#30415) according to our benchmark, concurrency level 16 is enough to fully utilize the object storage network bandwidth pr: #30371 Signed-off-by: yah01 --- internal/core/src/storage/ThreadPool.h | 11 +++++++---- internal/querynodev2/segments/pool.go | 7 ++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h index dd6098ed9d..9478968ca3 100644 --- a/internal/core/src/storage/ThreadPool.h +++ b/internal/core/src/storage/ThreadPool.h @@ -41,10 +41,13 @@ class ThreadPool { max_threads_size_ = CPU_NUM * thread_core_coefficient; // only IO pool will set large limit, but the CPU helps nothing to IO operations, - // we need to limit the max thread num, each thread will download 16 MiB data, - // it should be not greater than 256 (4GiB data) to avoid OOM and send too many requests to object storage - if (max_threads_size_ > 256) { - max_threads_size_ = 256; + // we need to limit the max thread num, each thread will download 16~64 MiB data, + // according to our benchmark, 16 threads is enough to saturate the network bandwidth. + if (min_threads_size_ > 16) { + min_threads_size_ = 16; + } + if (max_threads_size_ > 16) { + max_threads_size_ = 16; } LOG_SEGCORE_INFO_ << "Init thread pool:" << name_ << " with min worker num:" << min_threads_size_ diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index 29c6c65e56..a55990c6a1 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -71,8 +71,13 @@ func initDynamicPool() { func initLoadPool() { loadOnce.Do(func() { + pt := paramtable.Get() + poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt() + if poolSize > 16 { + poolSize = 16 + } pool := conc.NewPool[any]( - hardware.GetCPUNum()*paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt(), + poolSize, conc.WithPreAlloc(false), conc.WithDisablePurge(false), conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal