diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp index 83604d5fc8..74169d97d5 100644 --- a/internal/core/src/common/Common.cpp +++ b/internal/core/src/common/Common.cpp @@ -20,11 +20,11 @@ namespace milvus { int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE; -int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = +float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; -int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = +float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; -int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT = +float LOW_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT; int CPU_NUM = DEFAULT_CPU_NUM; int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE; @@ -41,21 +41,21 @@ SetIndexSliceSize(const int64_t size) { } void -SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) { +SetHighPriorityThreadCoreCoefficient(const float coefficient) { HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; LOG_INFO("set high priority thread pool core coefficient: {}", HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); } void -SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) { +SetMiddlePriorityThreadCoreCoefficient(const float coefficient) { MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; LOG_INFO("set middle priority thread pool core coefficient: {}", MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); } void -SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) { +SetLowPriorityThreadCoreCoefficient(const float coefficient) { LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; LOG_INFO("set low priority thread pool core coefficient: {}", LOW_PRIORITY_THREAD_CORE_COEFFICIENT); diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 978d93ff5e..b3501f06f4 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -24,9 +24,9 @@ namespace milvus { extern int64_t FILE_SLICE_SIZE; -extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; -extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; -extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; +extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; +extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; +extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL; @@ -38,13 +38,13 @@ void SetIndexSliceSize(const int64_t size); void -SetHighPriorityThreadCoreCoefficient(const int64_t coefficient); +SetHighPriorityThreadCoreCoefficient(const float coefficient); void -SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient); +SetMiddlePriorityThreadCoreCoefficient(const float coefficient); void -SetLowPriorityThreadCoreCoefficient(const int64_t coefficient); +SetLowPriorityThreadCoreCoefficient(const float coefficient); void SetCpuNum(const int core); diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 74d23accc5..1517f4671e 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -56,9 +56,9 @@ const char DEAFULT_QUERY_ID[] = "0"; const char DEFAULT_TASK_ID[] = "0"; const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes -const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10; -const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5; -const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1; +const float DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0; +const float DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5.0; +const float DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1.0; const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index 7d22071c2b..230b55c23f 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -14,16 +14,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include - #include "common/init_c.h" - -#include -#include "common/Slice.h" #include "common/Common.h" #include "common/Tracer.h" -#include "log/Log.h" std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9, flag10; @@ -36,32 +30,30 @@ InitIndexSliceSize(const int64_t size) { } void -InitHighPriorityThreadCoreCoefficient(const int64_t value) { +InitHighPriorityThreadCoreCoefficient(const float value) { std::call_once( flag2, - [](int64_t value) { + [](float value) { milvus::SetHighPriorityThreadCoreCoefficient(value); }, value); } void -InitMiddlePriorityThreadCoreCoefficient(const int64_t value) { +InitMiddlePriorityThreadCoreCoefficient(const float value) { std::call_once( flag4, - [](int64_t value) { + [](float value) { milvus::SetMiddlePriorityThreadCoreCoefficient(value); }, value); } void -InitLowPriorityThreadCoreCoefficient(const int64_t value) { +InitLowPriorityThreadCoreCoefficient(const float value) { std::call_once( flag5, - [](int64_t value) { - milvus::SetLowPriorityThreadCoreCoefficient(value); - }, + [](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); }, value); } diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index 78864a9a2a..c4223d34d7 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -28,13 +28,13 @@ void InitIndexSliceSize(const int64_t); void -InitHighPriorityThreadCoreCoefficient(const int64_t); +InitHighPriorityThreadCoreCoefficient(const float); void -InitMiddlePriorityThreadCoreCoefficient(const int64_t); +InitMiddlePriorityThreadCoreCoefficient(const float); void -InitLowPriorityThreadCoreCoefficient(const int64_t); +InitLowPriorityThreadCoreCoefficient(const float); void InitDefaultExprEvalBatchSize(int64_t val); diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h index ffebf78b64..db8f2bf9eb 100644 --- a/internal/core/src/storage/ThreadPool.h +++ b/internal/core/src/storage/ThreadPool.h @@ -33,25 +33,24 @@ namespace milvus { class ThreadPool { public: - explicit ThreadPool(const int thread_core_coefficient, std::string name) + explicit ThreadPool(const float thread_core_coefficient, std::string name) : shutdown_(false), name_(std::move(name)) { idle_threads_size_ = 0; current_threads_size_ = 0; - min_threads_size_ = CPU_NUM; - max_threads_size_ = CPU_NUM * thread_core_coefficient; + min_threads_size_ = 1; + max_threads_size_.store(std::max( + 1, + static_cast(std::round(CPU_NUM * thread_core_coefficient)))); // only IO pool will set large limit, but the CPU helps nothing to IO operations, // we need to limit the max thread num, each thread will download 16~64 MiB data, // according to our benchmark, 16 threads is enough to saturate the network bandwidth. - if (min_threads_size_ > 16) { - min_threads_size_ = 16; - } - if (max_threads_size_ > 16) { - max_threads_size_ = 16; + if (max_threads_size_.load() > 16) { + max_threads_size_.store(16); } LOG_INFO("Init thread pool:{}", name_) << " with min worker num:" << min_threads_size_ - << " and max worker num:" << max_threads_size_; + << " and max worker num:" << max_threads_size_.load(); Init(); } @@ -80,7 +79,7 @@ class ThreadPool { size_t GetMaxThreadNum() { - return max_threads_size_; + return max_threads_size_.load(); } template @@ -99,7 +98,7 @@ class ThreadPool { if (idle_threads_size_ > 0) { condition_lock_.notify_one(); - } else if (current_threads_size_ < max_threads_size_) { + } else if (current_threads_size_ < max_threads_size_.load()) { // Dynamic increase thread number std::thread t(&ThreadPool::Worker, this); assert(threads_.find(t.get_id()) == threads_.end()); @@ -116,11 +115,18 @@ class ThreadPool { void FinishThreads(); + void + Resize(int new_size) { + //no need to hold mutex here as we don't require + //max_threads_size to take effect instantly, just guaranteed atomic + max_threads_size_.store(new_size); + } + public: int min_threads_size_; int idle_threads_size_; int current_threads_size_; - int max_threads_size_; + std::atomic max_threads_size_; bool shutdown_; static constexpr size_t WAIT_SECONDS = 2; SafeQueue> work_queue_; diff --git a/internal/core/src/storage/ThreadPools.cpp b/internal/core/src/storage/ThreadPools.cpp index 89ee80b430..e6ecff3ea2 100644 --- a/internal/core/src/storage/ThreadPools.cpp +++ b/internal/core/src/storage/ThreadPools.cpp @@ -19,11 +19,8 @@ namespace milvus { std::map> ThreadPools::thread_pool_map; -std::map ThreadPools::coefficient_map; std::map ThreadPools::name_map; std::shared_mutex ThreadPools::mutex_; -ThreadPools ThreadPools::threadPools; -bool ThreadPools::has_setup_coefficients = false; void ThreadPools::ShutDown() { @@ -38,14 +35,21 @@ ThreadPool& ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) { std::unique_lock lock(mutex_); auto iter = thread_pool_map.find(priority); - if (!ThreadPools::has_setup_coefficients) { - ThreadPools::SetUpCoefficients(); - ThreadPools::has_setup_coefficients = true; - } if (iter != thread_pool_map.end()) { return *(iter->second); } else { - int64_t coefficient = coefficient_map[priority]; + float coefficient = 1.0; + switch (priority) { + case milvus::ThreadPoolPriority::HIGH: + coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; + break; + case milvus::ThreadPoolPriority::MIDDLE: + coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; + break; + default: + coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT; + break; + } std::string name = name_map[priority]; auto result = thread_pool_map.emplace( priority, std::make_unique(coefficient, name)); @@ -53,4 +57,22 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) { } } +void +ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority priority, + float ratio) { + int size = static_cast(std::round(milvus::CPU_NUM * ratio)); + if (size < 1) { + LOG_ERROR("Failed to resize threadPool, size:{}", size); + return; + } + std::unique_lock lock(mutex_); + auto iter = thread_pool_map.find(priority); + if (iter == thread_pool_map.end()) { + LOG_ERROR("Failed to find threadPool, priority:{}", priority); + return; + } + iter->second->Resize(size); + LOG_INFO("Resized threadPool priority:{}, size:{}", priority, size); +} + } // namespace milvus diff --git a/internal/core/src/storage/ThreadPools.h b/internal/core/src/storage/ThreadPools.h index 0701f91dd2..4400a02097 100644 --- a/internal/core/src/storage/ThreadPools.h +++ b/internal/core/src/storage/ThreadPools.h @@ -33,6 +33,9 @@ class ThreadPools { static ThreadPool& GetThreadPool(ThreadPoolPriority priority); + static void + ResizeThreadPool(ThreadPoolPriority priority, float ratio); + ~ThreadPools() { ShutDown(); } @@ -43,25 +46,12 @@ class ThreadPools { name_map[MIDDLE] = "middle_priority_thread_pool"; name_map[LOW] = "low_priority_thread_pool"; } - static void - SetUpCoefficients() { - coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; - coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; - coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT; - LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}", - HIGH_PRIORITY_THREAD_CORE_COEFFICIENT, - MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT, - LOW_PRIORITY_THREAD_CORE_COEFFICIENT); - } void ShutDown(); static std::map> thread_pool_map; - static std::map coefficient_map; static std::map name_map; static std::shared_mutex mutex_; - static ThreadPools threadPools; - static bool has_setup_coefficients; }; } // namespace milvus diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index 51a6e064e6..28f7aa3ad7 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -19,6 +19,7 @@ #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" #include "storage/MmapManager.h" +#include "storage/ThreadPools.h" CStatus GetLocalUsedSize(const char* c_dir, int64_t* size) { @@ -116,3 +117,9 @@ void CleanRemoteChunkManagerSingleton() { milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release(); } + +void +ResizeTheadPool(int64_t priority, float ratio) { + milvus::ThreadPools::ResizeThreadPool( + static_cast(priority), ratio); +} diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index d6cac3b46f..15ab58c6cd 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -36,6 +36,9 @@ InitMmapManager(CMmapConfig c_mmap_config); void CleanRemoteChunkManagerSingleton(); +void +ResizeTheadPool(int64_t priority, float ratio); + #ifdef __cplusplus }; #endif diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index df25a73222..5768856468 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -103,6 +103,7 @@ set(MILVUS_TEST_FILES test_chunked_column_group.cpp test_group_chunk_translator.cpp test_chunked_segment_storage_v2.cpp + test_thread_pool.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_thread_pool.cpp b/internal/core/unittest/test_thread_pool.cpp new file mode 100644 index 0000000000..c6ff28a941 --- /dev/null +++ b/internal/core/unittest/test_thread_pool.cpp @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "storage/ThreadPools.h" +#include "common/Common.h" + +TEST(ThreadPool, ThreadNum) { + auto& threadPool = + milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); + auto max_thread_num = threadPool.GetMaxThreadNum(); + milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH, + 0.0); + ASSERT_EQ(threadPool.GetMaxThreadNum(), max_thread_num); + milvus::ThreadPools::ResizeThreadPool( + static_cast(6), 3.0); + ASSERT_EQ(threadPool.GetMaxThreadNum(), max_thread_num); + milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH, + 2.0); + ASSERT_EQ(threadPool.GetMaxThreadNum(), 2.0 * milvus::CPU_NUM); +} diff --git a/internal/datanode/index/init_segcore.go b/internal/datanode/index/init_segcore.go index fc92d6a04d..fa263507b6 100644 --- a/internal/datanode/index/init_segcore.go +++ b/internal/datanode/index/init_segcore.go @@ -53,11 +53,11 @@ func InitSegcore() { C.InitIndexSliceSize(cIndexSliceSize) // set up thread pool for different priorities - cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) + cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) - cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) + cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) - cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) + cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) cCPUNum := C.int(hardware.GetCPUNum()) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 2f3ce16f32..5a10126643 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -24,7 +24,7 @@ package querynodev2 #include "segcore/segcore_init_c.h" #include "common/init_c.h" #include "exec/expression/function/init_c.h" - +#include "storage/storage_c.h" */ import "C" @@ -189,6 +189,20 @@ func (node *QueryNode) Register() error { return nil } +func ResizeHighPriorityPool(evt *config.Event) { + if evt.HasUpdated { + pt := paramtable.Get() + newRatio := pt.CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat() + C.ResizeTheadPool(C.int64_t(0), C.float(newRatio)) + } +} + +func (node *QueryNode) RegisterSegcoreConfigWatcher() { + pt := paramtable.Get() + pt.Watch(pt.CommonCfg.HighPriorityThreadCoreCoefficient.Key, + config.NewHandler("common.threadCoreCoefficient.highPriority", ResizeHighPriorityPool)) +} + // InitSegcore set init params of segCore, such as chunckRows, SIMD type... func (node *QueryNode) InitSegcore() error { cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) @@ -217,12 +231,13 @@ func (node *QueryNode) InitSegcore() error { C.InitIndexSliceSize(cIndexSliceSize) // set up thread pool for different priorities - cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) + cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) - cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) + cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) - cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) + cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) + node.RegisterSegcoreConfigWatcher() cCPUNum := C.int(hardware.GetCPUNum()) C.InitCpuNum(cCPUNum) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c6ad3674b5..250d47155c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -224,9 +224,9 @@ type commonConfig struct { EntityExpirationTTL ParamItem `refreshable:"true"` IndexSliceSize ParamItem `refreshable:"false"` - HighPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` - MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"false"` - LowPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` + HighPriorityThreadCoreCoefficient ParamItem `refreshable:"true"` + MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"true"` + LowPriorityThreadCoreCoefficient ParamItem `refreshable:"true"` EnableMaterializedView ParamItem `refreshable:"false"` BuildIndexThreadPoolRatio ParamItem `refreshable:"false"` MaxDegree ParamItem `refreshable:"true"`