enhance: resize high priority wqthreadpool dynamically(#40838) (#41549) (#41929)

related: #40838
pr: https://github.com/milvus-io/milvus/pull/41549

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-05-30 10:18:36 +08:00 committed by GitHub
parent 66cc194ab2
commit ed0df38605
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 140 additions and 75 deletions

View File

@ -20,11 +20,11 @@
namespace milvus {
int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE;
int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
float LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
int CPU_NUM = DEFAULT_CPU_NUM;
int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE;
@ -41,21 +41,21 @@ SetIndexSliceSize(const int64_t size) {
}
void
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) {
SetHighPriorityThreadCoreCoefficient(const float coefficient) {
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set high priority thread pool core coefficient: {}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) {
SetMiddlePriorityThreadCoreCoefficient(const float coefficient) {
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set middle priority thread pool core coefficient: {}",
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) {
SetLowPriorityThreadCoreCoefficient(const float coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set low priority thread pool core coefficient: {}",
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);

View File

@ -24,9 +24,9 @@
namespace milvus {
extern int64_t FILE_SLICE_SIZE;
extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL;
@ -38,13 +38,13 @@ void
SetIndexSliceSize(const int64_t size);
void
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient);
SetHighPriorityThreadCoreCoefficient(const float coefficient);
void
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient);
SetMiddlePriorityThreadCoreCoefficient(const float coefficient);
void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient);
SetLowPriorityThreadCoreCoefficient(const float coefficient);
void
SetCpuNum(const int core);

View File

@ -56,9 +56,9 @@ const char DEAFULT_QUERY_ID[] = "0";
const char DEFAULT_TASK_ID[] = "0";
const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes
const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10;
const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5;
const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1;
const float DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0;
const float DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5.0;
const float DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1.0;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes

View File

@ -14,16 +14,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <mutex>
#include "common/init_c.h"
#include <string>
#include "common/Slice.h"
#include "common/Common.h"
#include "common/Tracer.h"
#include "log/Log.h"
std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9,
flag10;
@ -36,32 +30,30 @@ InitIndexSliceSize(const int64_t size) {
}
void
InitHighPriorityThreadCoreCoefficient(const int64_t value) {
InitHighPriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag2,
[](int64_t value) {
[](float value) {
milvus::SetHighPriorityThreadCoreCoefficient(value);
},
value);
}
void
InitMiddlePriorityThreadCoreCoefficient(const int64_t value) {
InitMiddlePriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag4,
[](int64_t value) {
[](float value) {
milvus::SetMiddlePriorityThreadCoreCoefficient(value);
},
value);
}
void
InitLowPriorityThreadCoreCoefficient(const int64_t value) {
InitLowPriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag5,
[](int64_t value) {
milvus::SetLowPriorityThreadCoreCoefficient(value);
},
[](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); },
value);
}

View File

@ -28,13 +28,13 @@ void
InitIndexSliceSize(const int64_t);
void
InitHighPriorityThreadCoreCoefficient(const int64_t);
InitHighPriorityThreadCoreCoefficient(const float);
void
InitMiddlePriorityThreadCoreCoefficient(const int64_t);
InitMiddlePriorityThreadCoreCoefficient(const float);
void
InitLowPriorityThreadCoreCoefficient(const int64_t);
InitLowPriorityThreadCoreCoefficient(const float);
void
InitDefaultExprEvalBatchSize(int64_t val);

View File

@ -33,25 +33,24 @@ namespace milvus {
class ThreadPool {
public:
explicit ThreadPool(const int thread_core_coefficient, std::string name)
explicit ThreadPool(const float thread_core_coefficient, std::string name)
: shutdown_(false), name_(std::move(name)) {
idle_threads_size_ = 0;
current_threads_size_ = 0;
min_threads_size_ = CPU_NUM;
max_threads_size_ = CPU_NUM * thread_core_coefficient;
min_threads_size_ = 1;
max_threads_size_.store(std::max(
1,
static_cast<int>(std::round(CPU_NUM * thread_core_coefficient))));
// only IO pool will set large limit, but the CPU helps nothing to IO operations,
// we need to limit the max thread num, each thread will download 16~64 MiB data,
// according to our benchmark, 16 threads is enough to saturate the network bandwidth.
if (min_threads_size_ > 16) {
min_threads_size_ = 16;
}
if (max_threads_size_ > 16) {
max_threads_size_ = 16;
if (max_threads_size_.load() > 16) {
max_threads_size_.store(16);
}
LOG_INFO("Init thread pool:{}", name_)
<< " with min worker num:" << min_threads_size_
<< " and max worker num:" << max_threads_size_;
<< " and max worker num:" << max_threads_size_.load();
Init();
}
@ -80,7 +79,7 @@ class ThreadPool {
size_t
GetMaxThreadNum() {
return max_threads_size_;
return max_threads_size_.load();
}
template <typename F, typename... Args>
@ -99,7 +98,7 @@ class ThreadPool {
if (idle_threads_size_ > 0) {
condition_lock_.notify_one();
} else if (current_threads_size_ < max_threads_size_) {
} else if (current_threads_size_ < max_threads_size_.load()) {
// Dynamic increase thread number
std::thread t(&ThreadPool::Worker, this);
assert(threads_.find(t.get_id()) == threads_.end());
@ -116,11 +115,18 @@ class ThreadPool {
void
FinishThreads();
void
Resize(int new_size) {
//no need to hold mutex here as we don't require
//max_threads_size to take effect instantly, just guaranteed atomic
max_threads_size_.store(new_size);
}
public:
int min_threads_size_;
int idle_threads_size_;
int current_threads_size_;
int max_threads_size_;
std::atomic<int> max_threads_size_;
bool shutdown_;
static constexpr size_t WAIT_SECONDS = 2;
SafeQueue<std::function<void()>> work_queue_;

View File

@ -19,11 +19,8 @@ namespace milvus {
std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
ThreadPools::thread_pool_map;
std::map<ThreadPoolPriority, int64_t> ThreadPools::coefficient_map;
std::map<ThreadPoolPriority, std::string> ThreadPools::name_map;
std::shared_mutex ThreadPools::mutex_;
ThreadPools ThreadPools::threadPools;
bool ThreadPools::has_setup_coefficients = false;
void
ThreadPools::ShutDown() {
@ -38,14 +35,21 @@ ThreadPool&
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
std::unique_lock<std::shared_mutex> lock(mutex_);
auto iter = thread_pool_map.find(priority);
if (!ThreadPools::has_setup_coefficients) {
ThreadPools::SetUpCoefficients();
ThreadPools::has_setup_coefficients = true;
}
if (iter != thread_pool_map.end()) {
return *(iter->second);
} else {
int64_t coefficient = coefficient_map[priority];
float coefficient = 1.0;
switch (priority) {
case milvus::ThreadPoolPriority::HIGH:
coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
case milvus::ThreadPoolPriority::MIDDLE:
coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
default:
coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
}
std::string name = name_map[priority];
auto result = thread_pool_map.emplace(
priority, std::make_unique<ThreadPool>(coefficient, name));
@ -53,4 +57,22 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
}
}
void
ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority priority,
float ratio) {
int size = static_cast<int>(std::round(milvus::CPU_NUM * ratio));
if (size < 1) {
LOG_ERROR("Failed to resize threadPool, size:{}", size);
return;
}
std::unique_lock<std::shared_mutex> lock(mutex_);
auto iter = thread_pool_map.find(priority);
if (iter == thread_pool_map.end()) {
LOG_ERROR("Failed to find threadPool, priority:{}", priority);
return;
}
iter->second->Resize(size);
LOG_INFO("Resized threadPool priority:{}, size:{}", priority, size);
}
} // namespace milvus

View File

@ -33,6 +33,9 @@ class ThreadPools {
static ThreadPool&
GetThreadPool(ThreadPoolPriority priority);
static void
ResizeThreadPool(ThreadPoolPriority priority, float ratio);
~ThreadPools() {
ShutDown();
}
@ -43,25 +46,12 @@ class ThreadPools {
name_map[MIDDLE] = "middle_priority_thread_pool";
name_map[LOW] = "low_priority_thread_pool";
}
static void
SetUpCoefficients() {
coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT,
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT,
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
ShutDown();
static std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
thread_pool_map;
static std::map<ThreadPoolPriority, int64_t> coefficient_map;
static std::map<ThreadPoolPriority, std::string> name_map;
static std::shared_mutex mutex_;
static ThreadPools threadPools;
static bool has_setup_coefficients;
};
} // namespace milvus

View File

@ -19,6 +19,7 @@
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/MmapManager.h"
#include "storage/ThreadPools.h"
CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
@ -116,3 +117,9 @@ void
CleanRemoteChunkManagerSingleton() {
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release();
}
void
ResizeTheadPool(int64_t priority, float ratio) {
milvus::ThreadPools::ResizeThreadPool(
static_cast<milvus::ThreadPoolPriority>(priority), ratio);
}

View File

@ -36,6 +36,9 @@ InitMmapManager(CMmapConfig c_mmap_config);
void
CleanRemoteChunkManagerSingleton();
void
ResizeTheadPool(int64_t priority, float ratio);
#ifdef __cplusplus
};
#endif

View File

@ -103,6 +103,7 @@ set(MILVUS_TEST_FILES
test_chunked_column_group.cpp
test_group_chunk_translator.cpp
test_chunked_segment_storage_v2.cpp
test_thread_pool.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "storage/ThreadPools.h"
#include "common/Common.h"
TEST(ThreadPool, ThreadNum) {
auto& threadPool =
milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
auto max_thread_num = threadPool.GetMaxThreadNum();
milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH,
0.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), max_thread_num);
milvus::ThreadPools::ResizeThreadPool(
static_cast<milvus::ThreadPoolPriority>(6), 3.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), max_thread_num);
milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH,
2.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), 2.0 * milvus::CPU_NUM);
}

View File

@ -53,11 +53,11 @@ func InitSegcore() {
C.InitIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64())
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64())
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64())
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
cCPUNum := C.int(hardware.GetCPUNum())

View File

@ -24,7 +24,7 @@ package querynodev2
#include "segcore/segcore_init_c.h"
#include "common/init_c.h"
#include "exec/expression/function/init_c.h"
#include "storage/storage_c.h"
*/
import "C"
@ -189,6 +189,20 @@ func (node *QueryNode) Register() error {
return nil
}
func ResizeHighPriorityPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newRatio := pt.CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()
C.ResizeTheadPool(C.int64_t(0), C.float(newRatio))
}
}
func (node *QueryNode) RegisterSegcoreConfigWatcher() {
pt := paramtable.Get()
pt.Watch(pt.CommonCfg.HighPriorityThreadCoreCoefficient.Key,
config.NewHandler("common.threadCoreCoefficient.highPriority", ResizeHighPriorityPool))
}
// InitSegcore set init params of segCore, such as chunckRows, SIMD type...
func (node *QueryNode) InitSegcore() error {
cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf))
@ -217,12 +231,13 @@ func (node *QueryNode) InitSegcore() error {
C.InitIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64())
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64())
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64())
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
node.RegisterSegcoreConfigWatcher()
cCPUNum := C.int(hardware.GetCPUNum())
C.InitCpuNum(cCPUNum)

View File

@ -224,9 +224,9 @@ type commonConfig struct {
EntityExpirationTTL ParamItem `refreshable:"true"`
IndexSliceSize ParamItem `refreshable:"false"`
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
LowPriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
LowPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
EnableMaterializedView ParamItem `refreshable:"false"`
BuildIndexThreadPoolRatio ParamItem `refreshable:"false"`
MaxDegree ParamItem `refreshable:"true"`