enhance: supporting separate chunk cache pool(#42803) (#42901)

related: #42803

1. add a new thread pools using folly::CPUThreadPoolExecutor, named by
FThreadPools
2. reading vectors from chunkcache will use the separated
CHUNKCACHE_POOL to avoid being influenced by load collection
3. Note. For safety on cloud side on 2.5.x, only read-chunk-cache
operations is using this newly created thread pools other caller points
for threadpool will be mutated in the near future
4. master-branch doesn't need this pr as caching layer unified the chunk
cache behaviour

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-06-26 15:52:43 +08:00 committed by GitHub
parent 5aaaef3f7e
commit bfa9688da3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 80 additions and 12 deletions

View File

@ -839,6 +839,7 @@ common:
highPriority: 10 # This parameter specify how many times the number of threads is the number of cores in high priority pool
middlePriority: 5 # This parameter specify how many times the number of threads is the number of cores in middle priority pool
lowPriority: 1 # This parameter specify how many times the number of threads is the number of cores in low priority pool
chunkCache: 10 # This parameter specify how many times the number of threads is the number of cores in chunk cache pool
buildIndexThreadPoolRatio: 0.75
DiskIndex:
MaxDegree: 56

View File

@ -26,6 +26,9 @@ float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
float LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
float CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT;
int CPU_NUM = DEFAULT_CPU_NUM;
int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE;
@ -61,6 +64,13 @@ SetLowPriorityThreadCoreCoefficient(const float coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
SetChunkCacheThreadCoreCoefficient(const float coefficient) {
CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
LOG_INFO("set chunk cache thread pool core coefficient: {}",
CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT);
}
void
SetDefaultExecEvalExprBatchSize(int64_t val) {
EXEC_EVAL_EXPR_BATCH_SIZE = val;

View File

@ -27,6 +27,7 @@ extern int64_t FILE_SLICE_SIZE;
extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL;
@ -46,6 +47,9 @@ SetMiddlePriorityThreadCoreCoefficient(const float coefficient);
void
SetLowPriorityThreadCoreCoefficient(const float coefficient);
void
SetChunkCacheThreadCoreCoefficient(const float coefficient);
void
SetCpuNum(const int core);

View File

@ -59,6 +59,7 @@ const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes
const float DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0;
const float DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5.0;
const float DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1.0;
const float DEFAULT_CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0;
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes

View File

@ -57,6 +57,14 @@ InitLowPriorityThreadCoreCoefficient(const float value) {
value);
}
void
InitChunkCacheThreadCoreCoefficient(const float value) {
std::call_once(
flag7,
[](float value) { milvus::SetChunkCacheThreadCoreCoefficient(value); },
value);
}
void
InitCpuNum(const int value) {
std::call_once(

View File

@ -36,6 +36,9 @@ InitMiddlePriorityThreadCoreCoefficient(const float);
void
InitLowPriorityThreadCoreCoefficient(const float);
void
InitChunkCacheThreadCoreCoefficient(const float);
void
InitDefaultExprEvalBatchSize(int64_t val);

View File

@ -54,6 +54,7 @@
#include "storage/Util.h"
#include "storage/ThreadPools.h"
#include "storage/MmapManager.h"
#include <future>
namespace milvus::segcore {
@ -997,7 +998,8 @@ ChunkedSegmentSealedImpl::get_vector(FieldId field_id,
std::vector<std::future<
std::tuple<std::string, std::shared_ptr<ChunkedColumnBase>>>>
futures;
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
auto& pool =
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::CHUNKCACHE);
for (const auto& iter : path_to_column) {
const auto& data_path = iter.first;
auto column = std::dynamic_pointer_cast<ChunkedColumnBase>(
@ -1763,7 +1765,6 @@ ChunkedSegmentSealedImpl::bulk_subscript(FieldId field_id,
.count();
monitor::internal_core_get_vector_latency.Observe(get_vector_cost /
1000);
return vector;
}
}

View File

@ -25,9 +25,11 @@ std::shared_mutex ThreadPools::mutex_;
void
ThreadPools::ShutDown() {
for (auto& itr : thread_pool_map) {
LOG_INFO("Start shutting down threadPool with priority:", itr.first);
LOG_INFO("Start shutting down threadPool with priority:{}",
static_cast<int>(itr.first));
itr.second->ShutDown();
LOG_INFO("Finish shutting down threadPool with priority:", itr.first);
LOG_INFO("Finish shutting down threadPool with priority:{}",
static_cast<int>(itr.first));
}
}
@ -46,6 +48,9 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
case milvus::ThreadPoolPriority::MIDDLE:
coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
case milvus::ThreadPoolPriority::CHUNKCACHE:
coefficient = CHUNKCACHE_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
default:
coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
break;
@ -65,6 +70,7 @@ ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority priority,
LOG_ERROR("Failed to resize threadPool, size:{}", size);
return;
}
std::unique_lock<std::shared_mutex> lock(mutex_);
auto iter = thread_pool_map.find(priority);
if (iter == thread_pool_map.end()) {

View File

@ -18,7 +18,6 @@
#define MILVUS_THREADPOOLS_H
#include "ThreadPool.h"
#include "common/Common.h"
namespace milvus {
@ -26,6 +25,7 @@ enum ThreadPoolPriority {
HIGH = 0,
MIDDLE = 1,
LOW = 2,
CHUNKCACHE = 3,
};
class ThreadPools {
@ -36,20 +36,17 @@ class ThreadPools {
static void
ResizeThreadPool(ThreadPoolPriority priority, float ratio);
~ThreadPools() {
static void
ShutDown();
}
private:
void
ShutDown();
static std::map<ThreadPoolPriority, std::string>
name_map() {
static std::map<ThreadPoolPriority, std::string> name_map = {
{HIGH, "HIGH_SEGC_POOL"},
{MIDDLE, "MIDD_SEGC_POOL"},
{LOW, "LOW_SEGC_POOL"}};
{LOW, "LOW_SEGC_POOL"},
{CHUNKCACHE, "CHUNKCACHE_SEGC_POOL"}};
return name_map;
}

View File

@ -123,3 +123,8 @@ ResizeTheadPool(int64_t priority, float ratio) {
milvus::ThreadPools::ResizeThreadPool(
static_cast<milvus::ThreadPoolPriority>(priority), ratio);
}
void
ShutDownThreadPools() {
milvus::ThreadPools::ShutDown();
}

View File

@ -39,6 +39,9 @@ CleanRemoteChunkManagerSingleton();
void
ResizeTheadPool(int64_t priority, float ratio);
void
ShutDownThreadPools();
#ifdef __cplusplus
};
#endif

View File

@ -198,10 +198,20 @@ func ResizeHighPriorityPool(evt *config.Event) {
}
}
func ResizeChunkCachePool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newRatio := pt.CommonCfg.ChunkCacheThreadCoreCoefficient.GetAsFloat()
C.ResizeTheadPool(C.int64_t(3), C.float(newRatio))
}
}
func (node *QueryNode) RegisterSegcoreConfigWatcher() {
pt := paramtable.Get()
pt.Watch(pt.CommonCfg.HighPriorityThreadCoreCoefficient.Key,
config.NewHandler("common.threadCoreCoefficient.highPriority", ResizeHighPriorityPool))
pt.Watch(pt.CommonCfg.ChunkCacheThreadCoreCoefficient.Key,
config.NewHandler("common.threadCoreCoefficient.chunkCache", ResizeChunkCachePool))
}
// InitSegcore set init params of segCore, such as chunckRows, SIMD type...
@ -238,6 +248,8 @@ func (node *QueryNode) InitSegcore() error {
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
cChunkCacheThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.ChunkCacheThreadCoreCoefficient.GetAsFloat())
C.InitChunkCacheThreadCoreCoefficient(cChunkCacheThreadCoreCoefficient)
node.RegisterSegcoreConfigWatcher()
cCPUNum := C.int(hardware.GetCPUNum())

View File

@ -239,6 +239,11 @@ func CleanGlogManager() {
C.SegcoreCloseGlog()
}
func ShutDownThreadPools() {
C.ShutDownThreadPools()
log.Info("ShutDownThreadPools")
}
// HandleCStatus deals with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
if status.error_code == 0 {

View File

@ -44,6 +44,7 @@ const (
DefaultHighPriorityThreadCoreCoefficient = 10
DefaultMiddlePriorityThreadCoreCoefficient = 5
DefaultLowPriorityThreadCoreCoefficient = 1
DefaultChunkCacheThreadCoreCoefficient = 10
DefaultSessionTTL = 30 // s
DefaultSessionRetryTimes = 30
@ -225,6 +226,7 @@ type commonConfig struct {
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
LowPriorityThreadCoreCoefficient ParamItem `refreshable:"true"`
ChunkCacheThreadCoreCoefficient ParamItem `refreshable:"true"`
EnableMaterializedView ParamItem `refreshable:"false"`
BuildIndexThreadPoolRatio ParamItem `refreshable:"false"`
MaxDegree ParamItem `refreshable:"true"`
@ -646,6 +648,16 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
}
p.LowPriorityThreadCoreCoefficient.Init(base.mgr)
p.ChunkCacheThreadCoreCoefficient = ParamItem{
Key: "common.threadCoreCoefficient.chunkCache",
Version: "2.0.0",
DefaultValue: strconv.Itoa(DefaultChunkCacheThreadCoreCoefficient),
Doc: "This parameter specify how many times the number of threads " +
"is the number of cores in chunk cache pool",
Export: true,
}
p.ChunkCacheThreadCoreCoefficient.Init(base.mgr)
p.BuildIndexThreadPoolRatio = ParamItem{
Key: "common.buildIndexThreadPoolRatio",
Version: "2.4.0",