enhance: resize high priority wqthreadpool dynamically(#40838) (#41549)

related: #40838

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-04-28 00:44:39 +08:00 committed by GitHub
parent 8af350d9db
commit 69a80b9ce3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 140 additions and 75 deletions

View File

@ -20,11 +20,11 @@
namespace milvus { namespace milvus {
int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE; int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE;
int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT = float LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT; DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
int CPU_NUM = DEFAULT_CPU_NUM; int CPU_NUM = DEFAULT_CPU_NUM;
int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE; int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE;
@ -40,21 +40,21 @@ SetIndexSliceSize(const int64_t size) {
} }
void void
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) { SetHighPriorityThreadCoreCoefficient(const float coefficient) {
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set high priority thread pool core coefficient: {}", LOG_INFO("set high priority thread pool core coefficient: {}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
} }
void void
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) { SetMiddlePriorityThreadCoreCoefficient(const float coefficient) {
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set middle priority thread pool core coefficient: {}", LOG_INFO("set middle priority thread pool core coefficient: {}",
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
} }
void void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) { SetLowPriorityThreadCoreCoefficient(const float coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set low priority thread pool core coefficient: {}", LOG_INFO("set low priority thread pool core coefficient: {}",
LOW_PRIORITY_THREAD_CORE_COEFFICIENT); LOW_PRIORITY_THREAD_CORE_COEFFICIENT);

View File

@ -24,9 +24,9 @@
namespace milvus { namespace milvus {
extern int64_t FILE_SLICE_SIZE; extern int64_t FILE_SLICE_SIZE;
extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM; extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL; extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL;
@ -36,13 +36,13 @@ void
SetIndexSliceSize(const int64_t size); SetIndexSliceSize(const int64_t size);
void void
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient); SetHighPriorityThreadCoreCoefficient(const float coefficient);
void void
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient); SetMiddlePriorityThreadCoreCoefficient(const float coefficient);
void void
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient); SetLowPriorityThreadCoreCoefficient(const float coefficient);
void void
SetCpuNum(const int core); SetCpuNum(const int core);

View File

@ -56,9 +56,9 @@ const char DEAFULT_QUERY_ID[] = "0";
const char DEFAULT_TASK_ID[] = "0"; const char DEFAULT_TASK_ID[] = "0";
const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes
const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10; const float DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0;
const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5; const float DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5.0;
const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1; const float DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1.0;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes

View File

@ -14,16 +14,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <memory>
#include <mutex> #include <mutex>
#include "common/init_c.h" #include "common/init_c.h"
#include <string>
#include "common/Slice.h"
#include "common/Common.h" #include "common/Common.h"
#include "common/Tracer.h" #include "common/Tracer.h"
#include "log/Log.h"
std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9, std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9,
flag10; flag10;
@ -36,32 +30,30 @@ InitIndexSliceSize(const int64_t size) {
} }
void void
InitHighPriorityThreadCoreCoefficient(const int64_t value) { InitHighPriorityThreadCoreCoefficient(const float value) {
std::call_once( std::call_once(
flag2, flag2,
[](int64_t value) { [](float value) {
milvus::SetHighPriorityThreadCoreCoefficient(value); milvus::SetHighPriorityThreadCoreCoefficient(value);
}, },
value); value);
} }
void void
InitMiddlePriorityThreadCoreCoefficient(const int64_t value) { InitMiddlePriorityThreadCoreCoefficient(const float value) {
std::call_once( std::call_once(
flag4, flag4,
[](int64_t value) { [](float value) {
milvus::SetMiddlePriorityThreadCoreCoefficient(value); milvus::SetMiddlePriorityThreadCoreCoefficient(value);
}, },
value); value);
} }
void void
InitLowPriorityThreadCoreCoefficient(const int64_t value) { InitLowPriorityThreadCoreCoefficient(const float value) {
std::call_once( std::call_once(
flag5, flag5,
[](int64_t value) { [](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); },
milvus::SetLowPriorityThreadCoreCoefficient(value);
},
value); value);
} }

View File

@ -28,13 +28,13 @@ void
InitIndexSliceSize(const int64_t); InitIndexSliceSize(const int64_t);
void void
InitHighPriorityThreadCoreCoefficient(const int64_t); InitHighPriorityThreadCoreCoefficient(const float);
void void
InitMiddlePriorityThreadCoreCoefficient(const int64_t); InitMiddlePriorityThreadCoreCoefficient(const float);
void void
InitLowPriorityThreadCoreCoefficient(const int64_t); InitLowPriorityThreadCoreCoefficient(const float);
void void
InitDefaultExprEvalBatchSize(int64_t val); InitDefaultExprEvalBatchSize(int64_t val);

View File

@ -33,25 +33,24 @@ namespace milvus {
class ThreadPool { class ThreadPool {
public: public:
explicit ThreadPool(const int thread_core_coefficient, std::string name) explicit ThreadPool(const float thread_core_coefficient, std::string name)
: shutdown_(false), name_(std::move(name)) { : shutdown_(false), name_(std::move(name)) {
idle_threads_size_ = 0; idle_threads_size_ = 0;
current_threads_size_ = 0; current_threads_size_ = 0;
min_threads_size_ = CPU_NUM; min_threads_size_ = 1;
max_threads_size_ = CPU_NUM * thread_core_coefficient; max_threads_size_.store(std::max(
1,
static_cast<int>(std::round(CPU_NUM * thread_core_coefficient))));
// only IO pool will set large limit, but the CPU helps nothing to IO operations, // only IO pool will set large limit, but the CPU helps nothing to IO operations,
// we need to limit the max thread num, each thread will download 16~64 MiB data, // we need to limit the max thread num, each thread will download 16~64 MiB data,
// according to our benchmark, 16 threads is enough to saturate the network bandwidth. // according to our benchmark, 16 threads is enough to saturate the network bandwidth.
if (min_threads_size_ > 16) { if (max_threads_size_.load() > 16) {
min_threads_size_ = 16; max_threads_size_.store(16);
}
if (max_threads_size_ > 16) {
max_threads_size_ = 16;
} }
LOG_INFO("Init thread pool:{}", name_) LOG_INFO("Init thread pool:{}", name_)
<< " with min worker num:" << min_threads_size_ << " with min worker num:" << min_threads_size_
<< " and max worker num:" << max_threads_size_; << " and max worker num:" << max_threads_size_.load();
Init(); Init();
} }
@ -80,7 +79,7 @@ class ThreadPool {
size_t size_t
GetMaxThreadNum() { GetMaxThreadNum() {
return max_threads_size_; return max_threads_size_.load();
} }
template <typename F, typename... Args> template <typename F, typename... Args>
@ -100,7 +99,7 @@ class ThreadPool {
if (idle_threads_size_ > 0) { if (idle_threads_size_ > 0) {
condition_lock_.notify_one(); condition_lock_.notify_one();
} else if (current_threads_size_ < max_threads_size_) { } else if (current_threads_size_ < max_threads_size_.load()) {
// Dynamic increase thread number // Dynamic increase thread number
std::thread t(&ThreadPool::Worker, this); std::thread t(&ThreadPool::Worker, this);
assert(threads_.find(t.get_id()) == threads_.end()); assert(threads_.find(t.get_id()) == threads_.end());
@ -117,11 +116,18 @@ class ThreadPool {
void void
FinishThreads(); FinishThreads();
void
Resize(int new_size) {
//no need to hold mutex here as we don't require
//max_threads_size to take effect instantly, just guaranteed atomic
max_threads_size_.store(new_size);
}
public: public:
int min_threads_size_; int min_threads_size_;
int idle_threads_size_; int idle_threads_size_;
int current_threads_size_; int current_threads_size_;
int max_threads_size_; std::atomic<int> max_threads_size_;
bool shutdown_; bool shutdown_;
static constexpr size_t WAIT_SECONDS = 2; static constexpr size_t WAIT_SECONDS = 2;
SafeQueue<std::function<void()>> work_queue_; SafeQueue<std::function<void()>> work_queue_;

View File

@ -19,11 +19,8 @@ namespace milvus {
std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>> std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
ThreadPools::thread_pool_map; ThreadPools::thread_pool_map;
std::map<ThreadPoolPriority, int64_t> ThreadPools::coefficient_map;
std::map<ThreadPoolPriority, std::string> ThreadPools::name_map; std::map<ThreadPoolPriority, std::string> ThreadPools::name_map;
std::shared_mutex ThreadPools::mutex_; std::shared_mutex ThreadPools::mutex_;
ThreadPools ThreadPools::threadPools;
bool ThreadPools::has_setup_coefficients = false;
void void
ThreadPools::ShutDown() { ThreadPools::ShutDown() {
@ -38,14 +35,21 @@ ThreadPool&
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) { ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
std::unique_lock<std::shared_mutex> lock(mutex_); std::unique_lock<std::shared_mutex> lock(mutex_);
auto iter = thread_pool_map.find(priority); auto iter = thread_pool_map.find(priority);
if (!ThreadPools::has_setup_coefficients) {
ThreadPools::SetUpCoefficients();
ThreadPools::has_setup_coefficients = true;
}
if (iter != thread_pool_map.end()) { if (iter != thread_pool_map.end()) {
return *(iter->second); return *(iter->second);
} else { } else {
int64_t coefficient = coefficient_map[priority]; float coefficient = 1.0;
switch (priority) {
case milvus::ThreadPoolPriority::HIGH:
coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
case milvus::ThreadPoolPriority::MIDDLE:
coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
default:
coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
}
std::string name = name_map[priority]; std::string name = name_map[priority];
auto result = thread_pool_map.emplace( auto result = thread_pool_map.emplace(
priority, std::make_unique<ThreadPool>(coefficient, name)); priority, std::make_unique<ThreadPool>(coefficient, name));
@ -53,4 +57,22 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
} }
} }
void
ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority priority,
float ratio) {
int size = static_cast<int>(std::round(milvus::CPU_NUM * ratio));
if (size < 1) {
LOG_ERROR("Failed to resize threadPool, size:{}", size);
return;
}
std::unique_lock<std::shared_mutex> lock(mutex_);
auto iter = thread_pool_map.find(priority);
if (iter == thread_pool_map.end()) {
LOG_ERROR("Failed to find threadPool, priority:{}", priority);
return;
}
iter->second->Resize(size);
LOG_INFO("Resized threadPool priority:{}, size:{}", priority, size);
}
} // namespace milvus } // namespace milvus

View File

@ -33,6 +33,9 @@ class ThreadPools {
static ThreadPool& static ThreadPool&
GetThreadPool(ThreadPoolPriority priority); GetThreadPool(ThreadPoolPriority priority);
static void
ResizeThreadPool(ThreadPoolPriority priority, float ratio);
~ThreadPools() { ~ThreadPools() {
ShutDown(); ShutDown();
} }
@ -43,25 +46,12 @@ class ThreadPools {
name_map[MIDDLE] = "middle_priority_thread_pool"; name_map[MIDDLE] = "middle_priority_thread_pool";
name_map[LOW] = "low_priority_thread_pool"; name_map[LOW] = "low_priority_thread_pool";
} }
static void
SetUpCoefficients() {
coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT,
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT,
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void void
ShutDown(); ShutDown();
static std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>> static std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
thread_pool_map; thread_pool_map;
static std::map<ThreadPoolPriority, int64_t> coefficient_map;
static std::map<ThreadPoolPriority, std::string> name_map; static std::map<ThreadPoolPriority, std::string> name_map;
static std::shared_mutex mutex_; static std::shared_mutex mutex_;
static ThreadPools threadPools;
static bool has_setup_coefficients;
}; };
} // namespace milvus } // namespace milvus

View File

@ -19,6 +19,7 @@
#include "storage/RemoteChunkManagerSingleton.h" #include "storage/RemoteChunkManagerSingleton.h"
#include "storage/LocalChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h"
#include "storage/MmapManager.h" #include "storage/MmapManager.h"
#include "storage/ThreadPools.h"
CStatus CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) { GetLocalUsedSize(const char* c_dir, int64_t* size) {
@ -116,3 +117,9 @@ void
CleanRemoteChunkManagerSingleton() { CleanRemoteChunkManagerSingleton() {
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release(); milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release();
} }
void
ResizeTheadPool(int64_t priority, float ratio) {
milvus::ThreadPools::ResizeThreadPool(
static_cast<milvus::ThreadPoolPriority>(priority), ratio);
}

View File

@ -36,6 +36,9 @@ InitMmapManager(CMmapConfig c_mmap_config);
void void
CleanRemoteChunkManagerSingleton(); CleanRemoteChunkManagerSingleton();
void
ResizeTheadPool(int64_t priority, float ratio);
#ifdef __cplusplus #ifdef __cplusplus
}; };
#endif #endif

View File

@ -93,6 +93,7 @@ set(MILVUS_TEST_FILES
test_build_inverted_index_with_single_segment.cpp test_build_inverted_index_with_single_segment.cpp
test_json_index.cpp test_json_index.cpp
test_json_key_stats_index.cpp test_json_key_stats_index.cpp
test_thread_pool.cpp
) )
if ( INDEX_ENGINE STREQUAL "cardinal" ) if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "storage/ThreadPools.h"
TEST(ThreadPool, ThreadNum) {
auto& threadPool =
milvus::ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
ASSERT_EQ(threadPool.GetThreadNum(), 1);
ASSERT_EQ(threadPool.GetMaxThreadNum(), 10);
milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH,
0.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), 10);
milvus::ThreadPools::ResizeThreadPool(
static_cast<milvus::ThreadPoolPriority>(6), 3.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), 10);
milvus::ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority::HIGH,
2.0);
ASSERT_EQ(threadPool.GetMaxThreadNum(), 2);
}

View File

@ -176,11 +176,11 @@ func (i *IndexNode) initSegcore() {
C.InitIndexSliceSize(cIndexSliceSize) C.InitIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities // set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
cCPUNum := C.int(hardware.GetCPUNum()) cCPUNum := C.int(hardware.GetCPUNum())

View File

@ -24,7 +24,7 @@ package querynodev2
#include "segcore/segcore_init_c.h" #include "segcore/segcore_init_c.h"
#include "common/init_c.h" #include "common/init_c.h"
#include "exec/expression/function/init_c.h" #include "exec/expression/function/init_c.h"
#include "storage/storage_c.h"
*/ */
import "C" import "C"
@ -186,6 +186,20 @@ func (node *QueryNode) Register() error {
return nil return nil
} }
func ResizeHighPriorityPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newRatio := pt.CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()
C.ResizeTheadPool(C.int64_t(0), C.float(newRatio))
}
}
func (node *QueryNode) RegisterSegcoreConfigWatcher() {
pt := paramtable.Get()
pt.Watch(pt.CommonCfg.HighPriorityThreadCoreCoefficient.Key,
config.NewHandler("common.threadCoreCoefficient.highPriority", ResizeHighPriorityPool))
}
// InitSegcore set init params of segCore, such as chunckRows, SIMD type... // InitSegcore set init params of segCore, such as chunckRows, SIMD type...
func (node *QueryNode) InitSegcore() error { func (node *QueryNode) InitSegcore() error {
cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf))
@ -214,12 +228,13 @@ func (node *QueryNode) InitSegcore() error {
C.InitIndexSliceSize(cIndexSliceSize) C.InitIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities // set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsInt64()) cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt64()) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.int64_t(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt64()) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
node.RegisterSegcoreConfigWatcher()
cCPUNum := C.int(hardware.GetCPUNum()) cCPUNum := C.int(hardware.GetCPUNum())
C.InitCpuNum(cCPUNum) C.InitCpuNum(cCPUNum)

View File

@ -222,9 +222,9 @@ type commonConfig struct {
EntityExpirationTTL ParamItem `refreshable:"true"` EntityExpirationTTL ParamItem `refreshable:"true"`
IndexSliceSize ParamItem `refreshable:"false"` IndexSliceSize ParamItem `refreshable:"false"`
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` HighPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"false"` MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
LowPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` LowPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
EnableMaterializedView ParamItem `refreshable:"false"` EnableMaterializedView ParamItem `refreshable:"false"`
BuildIndexThreadPoolRatio ParamItem `refreshable:"false"` BuildIndexThreadPoolRatio ParamItem `refreshable:"false"`
MaxDegree ParamItem `refreshable:"true"` MaxDegree ParamItem `refreshable:"true"`