enhance: make segcore params effective without restarting milvus (#43231)

#43230

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-08-08 10:33:48 +08:00 committed by GitHub
parent 1561a4ae8c
commit c04d678ad4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 857 additions and 176 deletions

View File

@ -19,83 +19,104 @@
namespace milvus { namespace milvus {
int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE;
float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
float LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
int CPU_NUM = DEFAULT_CPU_NUM; int CPU_NUM = DEFAULT_CPU_NUM;
int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE;
bool OPTIMIZE_EXPR_ENABLED = DEFAULT_OPTIMIZE_EXPR_ENABLED;
int64_t JSON_KEY_STATS_COMMIT_INTERVAL = DEFAULT_JSON_KEY_STATS_COMMIT_INTERVAL; std::atomic<int64_t> FILE_SLICE_SIZE(DEFAULT_INDEX_FILE_SLICE_SIZE);
bool GROWING_JSON_KEY_STATS_ENABLED = DEFAULT_GROWING_JSON_KEY_STATS_ENABLED; std::atomic<float> HIGH_PRIORITY_THREAD_CORE_COEFFICIENT(
bool CONFIG_PARAM_TYPE_CHECK_ENABLED = DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED; DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
std::atomic<float> MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT(
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
std::atomic<float> LOW_PRIORITY_THREAD_CORE_COEFFICIENT(
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
std::atomic<int64_t> EXEC_EVAL_EXPR_BATCH_SIZE(
DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE);
std::atomic<bool> OPTIMIZE_EXPR_ENABLED(DEFAULT_OPTIMIZE_EXPR_ENABLED);
std::atomic<int64_t> JSON_KEY_STATS_COMMIT_INTERVAL(
DEFAULT_JSON_KEY_STATS_COMMIT_INTERVAL);
std::atomic<bool> GROWING_JSON_KEY_STATS_ENABLED(
DEFAULT_GROWING_JSON_KEY_STATS_ENABLED);
std::atomic<bool> CONFIG_PARAM_TYPE_CHECK_ENABLED(
DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED);
void
InitCpuNum(const int num) {
CPU_NUM = num;
LOG_INFO("set cpu num: {}", CPU_NUM);
}
void void
SetIndexSliceSize(const int64_t size) { SetIndexSliceSize(const int64_t size) {
FILE_SLICE_SIZE = size << 20; FILE_SLICE_SIZE.store(size << 20);
LOG_INFO("set config index slice size (byte): {}", FILE_SLICE_SIZE); LOG_INFO("set config index slice size (byte): {}", FILE_SLICE_SIZE.load());
} }
void void
SetHighPriorityThreadCoreCoefficient(const float coefficient) { SetHighPriorityThreadCoreCoefficient(const float coefficient) {
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set high priority thread pool core coefficient: {}", LOG_INFO("set high priority thread pool core coefficient: {}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.load());
} }
void void
SetMiddlePriorityThreadCoreCoefficient(const float coefficient) { SetMiddlePriorityThreadCoreCoefficient(const float coefficient) {
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set middle priority thread pool core coefficient: {}", LOG_INFO("set middle priority thread pool core coefficient: {}",
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.load());
} }
void void
SetLowPriorityThreadCoreCoefficient(const float coefficient) { SetLowPriorityThreadCoreCoefficient(const float coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; LOW_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set low priority thread pool core coefficient: {}", LOG_INFO("set low priority thread pool core coefficient: {}",
LOW_PRIORITY_THREAD_CORE_COEFFICIENT); LOW_PRIORITY_THREAD_CORE_COEFFICIENT.load());
} }
void void
SetDefaultExecEvalExprBatchSize(int64_t val) { SetDefaultExecEvalExprBatchSize(int64_t val) {
EXEC_EVAL_EXPR_BATCH_SIZE = val; EXEC_EVAL_EXPR_BATCH_SIZE.store(val);
LOG_INFO("set default expr eval batch size: {}", EXEC_EVAL_EXPR_BATCH_SIZE); LOG_INFO("set default expr eval batch size: {}",
} EXEC_EVAL_EXPR_BATCH_SIZE.load());
void
SetCpuNum(const int num) {
CPU_NUM = num;
} }
void void
SetDefaultOptimizeExprEnable(bool val) { SetDefaultOptimizeExprEnable(bool val) {
OPTIMIZE_EXPR_ENABLED = val; OPTIMIZE_EXPR_ENABLED.store(val);
LOG_INFO("set default optimize expr enabled: {}", OPTIMIZE_EXPR_ENABLED); LOG_INFO("set default optimize expr enabled: {}",
OPTIMIZE_EXPR_ENABLED.load());
} }
void void
SetDefaultJSONKeyStatsCommitInterval(int64_t val) { SetDefaultJSONKeyStatsCommitInterval(int64_t val) {
JSON_KEY_STATS_COMMIT_INTERVAL = val; JSON_KEY_STATS_COMMIT_INTERVAL.store(val);
LOG_INFO("set default json key Stats commit interval: {}", LOG_INFO("set default json key Stats commit interval: {}",
JSON_KEY_STATS_COMMIT_INTERVAL); JSON_KEY_STATS_COMMIT_INTERVAL.load());
} }
void void
SetDefaultGrowingJSONKeyStatsEnable(bool val) { SetDefaultGrowingJSONKeyStatsEnable(bool val) {
GROWING_JSON_KEY_STATS_ENABLED = val; GROWING_JSON_KEY_STATS_ENABLED.store(val);
LOG_INFO("set default growing json key index enable: {}", LOG_INFO("set default growing json key index enable: {}",
GROWING_JSON_KEY_STATS_ENABLED); GROWING_JSON_KEY_STATS_ENABLED.load());
} }
void void
SetDefaultConfigParamTypeCheck(bool val) { SetDefaultConfigParamTypeCheck(bool val) {
CONFIG_PARAM_TYPE_CHECK_ENABLED = val; CONFIG_PARAM_TYPE_CHECK_ENABLED.store(val);
LOG_INFO("set default config param type check enabled: {}", LOG_INFO("set default config param type check enabled: {}",
CONFIG_PARAM_TYPE_CHECK_ENABLED); CONFIG_PARAM_TYPE_CHECK_ENABLED.load());
} }
void
SetLogLevel(const char* level) {
LOG_INFO("set log level: {}", level);
if (strcmp(level, "debug") == 0) {
FLAGS_v = 5;
} else if (strcmp(level, "trace") == 0) {
FLAGS_v = 6;
} else {
FLAGS_v = 4;
}
}
} // namespace milvus } // namespace milvus

View File

@ -16,6 +16,7 @@
#pragma once #pragma once
#include <atomic>
#include <iostream> #include <iostream>
#include <utility> #include <utility>
#include <variant> #include <variant>
@ -23,16 +24,20 @@
namespace milvus { namespace milvus {
extern int64_t FILE_SLICE_SIZE;
extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern int CPU_NUM; extern int CPU_NUM;
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL; extern std::atomic<int64_t> FILE_SLICE_SIZE;
extern bool OPTIMIZE_EXPR_ENABLED; extern std::atomic<float> HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
extern bool GROWING_JSON_KEY_STATS_ENABLED; extern std::atomic<float> MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
extern bool CONFIG_PARAM_TYPE_CHECK_ENABLED; extern std::atomic<float> LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
extern std::atomic<int64_t> EXEC_EVAL_EXPR_BATCH_SIZE;
extern std::atomic<int64_t> JSON_KEY_STATS_COMMIT_INTERVAL;
extern std::atomic<bool> OPTIMIZE_EXPR_ENABLED;
extern std::atomic<bool> GROWING_JSON_KEY_STATS_ENABLED;
extern std::atomic<bool> CONFIG_PARAM_TYPE_CHECK_ENABLED;
void
InitCpuNum(const int core);
void void
SetIndexSliceSize(const int64_t size); SetIndexSliceSize(const int64_t size);
@ -46,9 +51,6 @@ SetMiddlePriorityThreadCoreCoefficient(const float coefficient);
void void
SetLowPriorityThreadCoreCoefficient(const float coefficient); SetLowPriorityThreadCoreCoefficient(const float coefficient);
void
SetCpuNum(const int core);
void void
SetDefaultExecEvalExprBatchSize(int64_t val); SetDefaultExecEvalExprBatchSize(int64_t val);
@ -64,6 +66,9 @@ SetDefaultGrowingJSONKeyStatsEnable(bool val);
void void
SetDefaultConfigParamTypeCheck(bool val); SetDefaultConfigParamTypeCheck(bool val);
void
SetLogLevel(const char* level);
struct BufferView { struct BufferView {
struct Element { struct Element {
const char* data_; const char* data_;

View File

@ -90,13 +90,17 @@ Disassemble(BinarySet& binarySet) {
std::vector<std::string> slice_key_list; std::vector<std::string> slice_key_list;
for (auto& kv : binarySet.binary_map_) { for (auto& kv : binarySet.binary_map_) {
if (kv.second->size > FILE_SLICE_SIZE) { if (kv.second->size > FILE_SLICE_SIZE.load()) {
slice_key_list.push_back(kv.first); slice_key_list.push_back(kv.first);
} }
} }
for (auto& key : slice_key_list) { for (auto& key : slice_key_list) {
Config slice_i; Config slice_i;
Slice(key, binarySet.Erase(key), FILE_SLICE_SIZE, binarySet, slice_i); Slice(key,
binarySet.Erase(key),
FILE_SLICE_SIZE.load(),
binarySet,
slice_i);
meta_info[META].emplace_back(slice_i); meta_info[META].emplace_back(slice_i);
} }
if (!slice_key_list.empty()) { if (!slice_key_list.empty()) {

View File

@ -19,88 +19,62 @@
#include "common/Common.h" #include "common/Common.h"
#include "common/Tracer.h" #include "common/Tracer.h"
std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9,
flag10;
std::once_flag traceFlag; std::once_flag traceFlag;
std::once_flag cpuNumFlag;
void
InitIndexSliceSize(const int64_t size) {
std::call_once(
flag1, [](int64_t size) { milvus::SetIndexSliceSize(size); }, size);
}
void
InitHighPriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag2,
[](float value) {
milvus::SetHighPriorityThreadCoreCoefficient(value);
},
value);
}
void
InitMiddlePriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag4,
[](float value) {
milvus::SetMiddlePriorityThreadCoreCoefficient(value);
},
value);
}
void
InitLowPriorityThreadCoreCoefficient(const float value) {
std::call_once(
flag5,
[](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); },
value);
}
void void
InitCpuNum(const int value) { InitCpuNum(const int value) {
std::call_once( std::call_once(cpuNumFlag, [value]() { milvus::InitCpuNum(value); });
flag3, [](int value) { milvus::SetCpuNum(value); }, value);
} }
void void
InitDefaultExprEvalBatchSize(int64_t val) { SetIndexSliceSize(const int64_t size) {
std::call_once( milvus::SetIndexSliceSize(size);
flag6,
[](int val) { milvus::SetDefaultExecEvalExprBatchSize(val); },
val);
} }
void void
InitDefaultOptimizeExprEnable(bool val) { SetHighPriorityThreadCoreCoefficient(const float value) {
std::call_once( milvus::SetHighPriorityThreadCoreCoefficient(value);
flag7,
[](bool val) { milvus::SetDefaultOptimizeExprEnable(val); },
val);
} }
void void
InitDefaultJSONKeyStatsCommitInterval(int64_t val) { SetMiddlePriorityThreadCoreCoefficient(const float value) {
std::call_once( milvus::SetMiddlePriorityThreadCoreCoefficient(value);
flag8,
[](int val) { milvus::SetDefaultJSONKeyStatsCommitInterval(val); },
val);
} }
void void
InitDefaultGrowingJSONKeyStatsEnable(bool val) { SetLowPriorityThreadCoreCoefficient(const float value) {
std::call_once( milvus::SetLowPriorityThreadCoreCoefficient(value);
flag9,
[](bool val) { milvus::SetDefaultGrowingJSONKeyStatsEnable(val); },
val);
} }
void void
InitDefaultConfigParamTypeCheck(bool val) { SetDefaultExprEvalBatchSize(int64_t val) {
std::call_once( milvus::SetDefaultExecEvalExprBatchSize(val);
flag10, }
[](bool val) { milvus::SetDefaultConfigParamTypeCheck(val); },
val); void
SetDefaultOptimizeExprEnable(bool val) {
milvus::SetDefaultOptimizeExprEnable(val);
}
void
SetDefaultJSONKeyStatsCommitInterval(int64_t val) {
milvus::SetDefaultJSONKeyStatsCommitInterval(val);
}
void
SetDefaultGrowingJSONKeyStatsEnable(bool val) {
milvus::SetDefaultGrowingJSONKeyStatsEnable(val);
}
void
SetDefaultConfigParamTypeCheck(bool val) {
milvus::SetDefaultConfigParamTypeCheck(val);
}
void
SetLogLevel(const char* level) {
milvus::SetLogLevel(level);
} }
void void

View File

@ -24,42 +24,46 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "common/type_c.h" #include "common/type_c.h"
void
InitIndexSliceSize(const int64_t);
void
InitHighPriorityThreadCoreCoefficient(const float);
void
InitMiddlePriorityThreadCoreCoefficient(const float);
void
InitLowPriorityThreadCoreCoefficient(const float);
void
InitDefaultExprEvalBatchSize(int64_t val);
void void
InitCpuNum(const int); InitCpuNum(const int);
void
SetIndexSliceSize(const int64_t);
void
SetHighPriorityThreadCoreCoefficient(const float);
void
SetMiddlePriorityThreadCoreCoefficient(const float);
void
SetLowPriorityThreadCoreCoefficient(const float);
void
SetDefaultExprEvalBatchSize(int64_t val);
void
SetDefaultOptimizeExprEnable(bool val);
void
SetDefaultJSONKeyStatsCommitInterval(int64_t val);
void
SetDefaultGrowingJSONKeyStatsEnable(bool val);
void
SetDefaultConfigParamTypeCheck(bool val);
// dynamic update segcore params
void
SetLogLevel(const char* level);
void void
InitTrace(CTraceConfig* config); InitTrace(CTraceConfig* config);
void void
SetTrace(CTraceConfig* config); SetTrace(CTraceConfig* config);
void
InitDefaultOptimizeExprEnable(bool val);
void
InitDefaultJSONKeyStatsCommitInterval(int64_t val);
void
InitDefaultGrowingJSONKeyStatsEnable(bool val);
void
InitDefaultConfigParamTypeCheck(bool val);
#ifdef __cplusplus #ifdef __cplusplus
}; };
#endif #endif

View File

@ -133,7 +133,7 @@ class QueryConfig : public MemConfig {
int64_t int64_t
get_expr_batch_size() const { get_expr_batch_size() const {
return BaseConfig::Get<int64_t>(kExprEvalBatchSize, return BaseConfig::Get<int64_t>(kExprEvalBatchSize,
EXEC_EVAL_EXPR_BATCH_SIZE); EXEC_EVAL_EXPR_BATCH_SIZE.load());
} }
}; };

View File

@ -68,7 +68,7 @@ CompileExpressions(const std::vector<expr::TypedExprPtr>& sources,
enable_constant_folding)); enable_constant_folding));
} }
if (OPTIMIZE_EXPR_ENABLED) { if (OPTIMIZE_EXPR_ENABLED.load()) {
OptimizeCompiledExprs(context, exprs); OptimizeCompiledExprs(context, exprs);
} }

View File

@ -180,7 +180,7 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
futures.reserve(blocks.size()); futures.reserve(blocks.size());
auto reader_memory_limit = std::max<int64_t>( auto reader_memory_limit = std::max<int64_t>(
memory_limit / blocks.size(), FILE_SLICE_SIZE); memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
for (const auto& block : blocks) { for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block, futures.emplace_back(pool.Submit([block,

View File

@ -128,7 +128,8 @@ DiskFileManagerImpl::AddFileInternal(
local_file_offsets.clear(); local_file_offsets.clear();
} }
auto batch_size = std::min(FILE_SLICE_SIZE, int64_t(fileSize) - offset); auto batch_size =
std::min(FILE_SLICE_SIZE.load(), int64_t(fileSize) - offset);
batch_remote_files.emplace_back(get_remote_path(fileName, slice_num)); batch_remote_files.emplace_back(get_remote_path(fileName, slice_num));
remote_file_sizes.emplace_back(batch_size); remote_file_sizes.emplace_back(batch_size);
local_file_offsets.emplace_back(offset); local_file_offsets.emplace_back(offset);
@ -258,7 +259,7 @@ DiskFileManagerImpl::CacheIndexToDiskInternal(
batch_remote_files.reserve(slices.second.size()); batch_remote_files.reserve(slices.second.size());
uint64_t max_parallel_degree = uint64_t max_parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE.load());
{ {
auto file_writer = storage::FileWriter(local_index_file_name); auto file_writer = storage::FileWriter(local_index_file_name);

View File

@ -41,13 +41,13 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
float coefficient = 1.0; float coefficient = 1.0;
switch (priority) { switch (priority) {
case milvus::ThreadPoolPriority::HIGH: case milvus::ThreadPoolPriority::HIGH:
coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.load();
break; break;
case milvus::ThreadPoolPriority::MIDDLE: case milvus::ThreadPoolPriority::MIDDLE:
coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.load();
break; break;
default: default:
coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT; coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT.load();
break; break;
} }
std::string name = name_map()[priority]; std::string name = name_map()[priority];

View File

@ -679,7 +679,7 @@ TEST_P(TaskTest, Test_reorder) {
auto query_context = std::make_shared<milvus::exec::QueryContext>( auto query_context = std::make_shared<milvus::exec::QueryContext>(
DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP);
ExecContext context(query_context.get()); ExecContext context(query_context.get());
OPTIMIZE_EXPR_ENABLED = false; OPTIMIZE_EXPR_ENABLED.store(false);
auto exprs = auto exprs =
milvus::exec::CompileExpressions({expr3}, &context, {}, false); milvus::exec::CompileExpressions({expr3}, &context, {}, false);
EXPECT_EQ(exprs.size(), 1); EXPECT_EQ(exprs.size(), 1);
@ -690,7 +690,7 @@ TEST_P(TaskTest, Test_reorder) {
std::cout << phy_expr->ToString() << std::endl; std::cout << phy_expr->ToString() << std::endl;
auto reorder = phy_expr->GetReorder(); auto reorder = phy_expr->GetReorder();
EXPECT_EQ(reorder.size(), 0); EXPECT_EQ(reorder.size(), 0);
OPTIMIZE_EXPR_ENABLED = true; OPTIMIZE_EXPR_ENABLED.store(true, std::memory_order_release);
} }
} }

View File

@ -4234,7 +4234,7 @@ TEST_P(ExprTest, TestMutiInConvert) {
auto plan = auto plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr); std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
auto final1 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); auto final1 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
OPTIMIZE_EXPR_ENABLED = false; OPTIMIZE_EXPR_ENABLED.store(false);
auto final2 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); auto final2 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final1.size(), final2.size()); EXPECT_EQ(final1.size(), final2.size());
for (auto i = 0; i < final1.size(); i++) { for (auto i = 0; i < final1.size(); i++) {
@ -5071,7 +5071,7 @@ TEST_P(ExprTest, TestGrowingSegmentGetBatchSize) {
8192, 10240, 20480, 30720, 40960, 102400, 204800, 307200}; 8192, 10240, 20480, 30720, 40960, 102400, 204800, 307200};
for (const auto& batch_size : test_batch_size) { for (const auto& batch_size : test_batch_size) {
EXEC_EVAL_EXPR_BATCH_SIZE = batch_size; EXEC_EVAL_EXPR_BATCH_SIZE.store(batch_size);
auto plan = plan::PlanFragment(plan_node); auto plan = plan::PlanFragment(plan_node);
auto query_context = std::make_shared<milvus::exec::QueryContext>( auto query_context = std::make_shared<milvus::exec::QueryContext>(
"query id", seg.get(), N, MAX_TIMESTAMP); "query id", seg.get(), N, MAX_TIMESTAMP);

View File

@ -50,15 +50,15 @@ func InitSegcore() {
// override segcore index slice size // override segcore index slice size
cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64())
C.InitIndexSliceSize(cIndexSliceSize) C.SetIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities // set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
cCPUNum := C.int(hardware.GetCPUNum()) cCPUNum := C.int(hardware.GetCPUNum())
C.InitCpuNum(cCPUNum) C.InitCpuNum(cCPUNum)

View File

@ -250,15 +250,15 @@ func (node *QueryNode) InitSegcore() error {
// override segcore index slice size // override segcore index slice size
cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64())
C.InitIndexSliceSize(cIndexSliceSize) C.SetIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities // set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
node.RegisterSegcoreConfigWatcher() node.RegisterSegcoreConfigWatcher()
@ -274,22 +274,22 @@ func (node *QueryNode) InitSegcore() error {
C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize) C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize)
cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64()) cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64())
C.InitDefaultExprEvalBatchSize(cExprBatchSize) C.SetDefaultExprEvalBatchSize(cExprBatchSize)
cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool()) cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool())
C.InitDefaultOptimizeExprEnable(cOptimizeExprEnabled) C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled)
cJSONKeyStatsCommitInterval := C.int64_t(paramtable.Get().QueryNodeCfg.JSONKeyStatsCommitInterval.GetAsInt64()) cJSONKeyStatsCommitInterval := C.int64_t(paramtable.Get().QueryNodeCfg.JSONKeyStatsCommitInterval.GetAsInt64())
C.InitDefaultJSONKeyStatsCommitInterval(cJSONKeyStatsCommitInterval) C.SetDefaultJSONKeyStatsCommitInterval(cJSONKeyStatsCommitInterval)
cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool()) cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool())
C.InitDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled) C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled)
cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32())
cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32())
C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize)
cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool()) cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool())
C.InitDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck) C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck)
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath) initcore.InitLocalChunkManager(localDataRootPath)

View File

@ -138,10 +138,9 @@ func Test_autoIndexConfig_panicIfNotValid(t *testing.T) {
Formatter: GetBuildParamFormatter(FloatVectorDefaultMetricType, "autoIndex.params.build"), Formatter: GetBuildParamFormatter(FloatVectorDefaultMetricType, "autoIndex.params.build"),
}, },
} }
p.IndexParams.Init(mgr)
assert.Panics(t, func() { assert.Panics(t, func() {
p.IndexParams.GetAsJSONMap() p.IndexParams.Init(mgr)
}) })
}) })
} }

View File

@ -143,7 +143,14 @@ func (bt *BaseTable) init() {
ret = strings.ReplaceAll(ret, ".", "") ret = strings.ReplaceAll(ret, ".", "")
return ret return ret
} }
bt.mgr, _ = config.Init()
var err error
bt.mgr, err = config.Init()
if err != nil {
log.Error("failed to initialize config manager", zap.Error(err))
panic(err)
}
if !bt.config.skipEnv { if !bt.config.skipEnv {
err := bt.mgr.AddSource(config.NewEnvSource(formatter)) err := bt.mgr.AddSource(config.NewEnvSource(formatter))
if err != nil { if err != nil {

View File

@ -17,6 +17,7 @@
package paramtable package paramtable
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"path" "path"
@ -530,6 +531,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
Export: true, Export: true,
} }
p.IndexSliceSize.Init(base.mgr) p.IndexSliceSize.Init(base.mgr)
p.IndexSliceSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
size, err := strconv.Atoi(newValue)
if err != nil {
return err
}
UpdateIndexSliceSize(size)
return nil
})
p.EnableMaterializedView = ParamItem{ p.EnableMaterializedView = ParamItem{
Key: "common.materializedView.enabled", Key: "common.materializedView.enabled",
@ -631,6 +640,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
Export: true, Export: true,
} }
p.HighPriorityThreadCoreCoefficient.Init(base.mgr) p.HighPriorityThreadCoreCoefficient.Init(base.mgr)
p.HighPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
coefficient, err := strconv.ParseFloat(newValue, 64)
if err != nil {
return err
}
UpdateHighPriorityThreadCoreCoefficient(coefficient)
return nil
})
p.MiddlePriorityThreadCoreCoefficient = ParamItem{ p.MiddlePriorityThreadCoreCoefficient = ParamItem{
Key: "common.threadCoreCoefficient.middlePriority", Key: "common.threadCoreCoefficient.middlePriority",
@ -641,6 +658,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
Export: true, Export: true,
} }
p.MiddlePriorityThreadCoreCoefficient.Init(base.mgr) p.MiddlePriorityThreadCoreCoefficient.Init(base.mgr)
p.MiddlePriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
coefficient, err := strconv.ParseFloat(newValue, 64)
if err != nil {
return err
}
UpdateMiddlePriorityThreadCoreCoefficient(coefficient)
return nil
})
p.LowPriorityThreadCoreCoefficient = ParamItem{ p.LowPriorityThreadCoreCoefficient = ParamItem{
Key: "common.threadCoreCoefficient.lowPriority", Key: "common.threadCoreCoefficient.lowPriority",
@ -651,6 +676,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
Export: true, Export: true,
} }
p.LowPriorityThreadCoreCoefficient.Init(base.mgr) p.LowPriorityThreadCoreCoefficient.Init(base.mgr)
p.LowPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
coefficient, err := strconv.ParseFloat(newValue, 64)
if err != nil {
return err
}
UpdateLowPriorityThreadCoreCoefficient(coefficient)
return nil
})
p.DiskWriteMode = ParamItem{ p.DiskWriteMode = ParamItem{
Key: "common.diskWriteMode", Key: "common.diskWriteMode",
@ -1064,6 +1097,15 @@ This helps Milvus-CDC synchronize incremental data`,
Export: true, Export: true,
} }
p.EnabledOptimizeExpr.Init(base.mgr) p.EnabledOptimizeExpr.Init(base.mgr)
p.EnabledOptimizeExpr.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
enable, err := strconv.ParseBool(newValue)
if err != nil {
return err
}
UpdateDefaultOptimizeExprEnable(enable)
return nil
})
p.EnabledJSONKeyStats = ParamItem{ p.EnabledJSONKeyStats = ParamItem{
Key: "common.enabledJSONKeyStats", Key: "common.enabledJSONKeyStats",
Version: "2.5.5", Version: "2.5.5",
@ -1081,6 +1123,14 @@ This helps Milvus-CDC synchronize incremental data`,
Export: true, Export: true,
} }
p.EnabledGrowingSegmentJSONKeyStats.Init(base.mgr) p.EnabledGrowingSegmentJSONKeyStats.Init(base.mgr)
p.EnabledGrowingSegmentJSONKeyStats.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
enable, err := strconv.ParseBool(newValue)
if err != nil {
return err
}
UpdateDefaultGrowingJSONKeyStatsEnable(enable)
return nil
})
p.EnableConfigParamTypeCheck = ParamItem{ p.EnableConfigParamTypeCheck = ParamItem{
Key: "common.enableConfigParamTypeCheck", Key: "common.enableConfigParamTypeCheck",
@ -1090,6 +1140,14 @@ This helps Milvus-CDC synchronize incremental data`,
Export: true, Export: true,
} }
p.EnableConfigParamTypeCheck.Init(base.mgr) p.EnableConfigParamTypeCheck.Init(base.mgr)
p.EnableConfigParamTypeCheck.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
enable, err := strconv.ParseBool(newValue)
if err != nil {
return err
}
UpdateDefaultConfigParamTypeCheck(enable)
return nil
})
} }
type gpuConfig struct { type gpuConfig struct {
@ -1330,6 +1388,9 @@ It is recommended to use debug level under test and development environments, an
Export: true, Export: true,
} }
l.Level.Init(base.mgr) l.Level.Init(base.mgr)
l.Level.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
return UpdateLogLevel(newValue)
})
l.RootPath = ParamItem{ l.RootPath = ParamItem{
Key: "log.file.rootPath", Key: "log.file.rootPath",
@ -3223,6 +3284,15 @@ If set to 0, time based eviction is disabled.`,
} }
p.CacheCellUnaccessedSurvivalTime.Init(base.mgr) p.CacheCellUnaccessedSurvivalTime.Init(base.mgr)
p.EnableDisk = ParamItem{
Key: "queryNode.enableDisk",
Version: "2.2.0",
DefaultValue: "false",
Doc: "enable querynode load disk index, and search on disk index",
Export: true,
}
p.EnableDisk.Init(base.mgr)
p.KnowhereThreadPoolSize = ParamItem{ p.KnowhereThreadPoolSize = ParamItem{
Key: "queryNode.segcore.knowhereThreadPoolNumRatio", Key: "queryNode.segcore.knowhereThreadPoolNumRatio",
Version: "2.0.0", Version: "2.0.0",
@ -3671,15 +3741,6 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
} }
p.CPURatio.Init(base.mgr) p.CPURatio.Init(base.mgr)
p.EnableDisk = ParamItem{
Key: "queryNode.enableDisk",
Version: "2.2.0",
DefaultValue: "false",
Doc: "enable querynode load disk index, and search on disk index",
Export: true,
}
p.EnableDisk.Init(base.mgr)
p.IndexOffsetCacheEnabled = ParamItem{ p.IndexOffsetCacheEnabled = ParamItem{
Key: "queryNode.indexOffsetCacheEnabled", Key: "queryNode.indexOffsetCacheEnabled",
Version: "2.5.0", Version: "2.5.0",
@ -3882,6 +3943,14 @@ user-task-polling:
Doc: "expr eval batch size for getnext interface", Doc: "expr eval batch size for getnext interface",
} }
p.ExprEvalBatchSize.Init(base.mgr) p.ExprEvalBatchSize.Init(base.mgr)
p.ExprEvalBatchSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
size, err := strconv.Atoi(newValue)
if err != nil {
return err
}
UpdateDefaultExprEvalBatchSize(size)
return nil
})
p.JSONKeyStatsCommitInterval = ParamItem{ p.JSONKeyStatsCommitInterval = ParamItem{
Key: "queryNode.segcore.jsonKeyStatsCommitInterval", Key: "queryNode.segcore.jsonKeyStatsCommitInterval",
@ -3891,6 +3960,14 @@ user-task-polling:
Export: true, Export: true,
} }
p.JSONKeyStatsCommitInterval.Init(base.mgr) p.JSONKeyStatsCommitInterval.Init(base.mgr)
p.JSONKeyStatsCommitInterval.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
interval, err := strconv.Atoi(newValue)
if err != nil {
return err
}
UpdateDefaultJSONKeyStatsCommitInterval(interval)
return nil
})
p.CleanExcludeSegInterval = ParamItem{ p.CleanExcludeSegInterval = ParamItem{
Key: "queryCoord.cleanExcludeSegmentInterval", Key: "queryCoord.cleanExcludeSegmentInterval",

View File

@ -17,6 +17,7 @@
package paramtable package paramtable
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@ -24,11 +25,15 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil"
) )
type ParamChangeCallback func(ctx context.Context, key, oldValue, newValue string) error
type ParamItem struct { type ParamItem struct {
Key string // which should be named as "A.B.C" Key string // which should be named as "A.B.C"
Version string Version string
@ -45,6 +50,9 @@ type ParamItem struct {
// for unittest. // for unittest.
tempValue atomic.Pointer[string] tempValue atomic.Pointer[string]
callback ParamChangeCallback
lastValue atomic.Pointer[string]
} }
func (pi *ParamItem) Init(manager *config.Manager) { func (pi *ParamItem) Init(manager *config.Manager) {
@ -52,6 +60,58 @@ func (pi *ParamItem) Init(manager *config.Manager) {
if pi.Forbidden { if pi.Forbidden {
pi.manager.ForbidUpdate(pi.Key) pi.manager.ForbidUpdate(pi.Key)
} }
currentValue := pi.GetValue()
pi.lastValue.Store(&currentValue)
if manager != nil && manager.Dispatcher != nil {
handler := config.NewHandler(pi.Key, func(event *config.Event) {
if event.Key == strings.ToLower(pi.Key) && event.EventType == config.UpdateType {
pi.handleConfigChange(event)
}
})
manager.Dispatcher.Register(pi.Key, handler)
}
}
func (pi *ParamItem) RegisterCallback(callback ParamChangeCallback) {
pi.callback = callback
}
func (pi *ParamItem) UnregisterCallback() {
pi.callback = nil
}
func (pi *ParamItem) handleConfigChange(event *config.Event) {
if pi.callback == nil {
return
}
oldValue := ""
if lastVal := pi.lastValue.Load(); lastVal != nil {
oldValue = *lastVal
}
newValue := event.Value
if oldValue == newValue {
return
}
if err := pi.callback(context.Background(), pi.Key, oldValue, newValue); err != nil {
log.Error("param change callback failed",
zap.String("key", pi.Key),
zap.String("oldValue", oldValue),
zap.String("newValue", newValue),
zap.Error(err))
} else {
log.Info("param value changed",
zap.String("key", pi.Key),
zap.String("oldValue", oldValue),
zap.String("newValue", newValue))
}
pi.lastValue.Store(&newValue)
} }
// Get original value with error // Get original value with error

View File

@ -0,0 +1,458 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paramtable
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/config"
)
func TestParamItem_RegisterCallback(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
callback := func(ctx context.Context, key, oldValue, newValue string) error {
return nil
}
param.RegisterCallback(callback)
assert.NotNil(t, param.callback)
param.UnregisterCallback()
assert.Nil(t, param.callback)
}
func TestParamItem_CallbackTriggered(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
var capturedOldValue, capturedNewValue string
var capturedKey string
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
capturedKey = key
capturedOldValue = oldValue
capturedNewValue = newValue
return nil
}
param.RegisterCallback(callback)
assert.Equal(t, "default", param.GetValue())
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
assert.True(t, callbackCalled)
assert.Equal(t, "test.param", capturedKey)
assert.Equal(t, "default", capturedOldValue)
assert.Equal(t, "new-value", capturedNewValue)
}
func TestParamItem_CallbackErrorHandling(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
errorCallback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
return fmt.Errorf("callback error")
}
param.RegisterCallback(errorCallback)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
assert.True(t, callbackCalled)
}
func TestParamItem_NoValueChange(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
return nil
}
param.RegisterCallback(callback)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "default",
}
param.handleConfigChange(event)
assert.False(t, callbackCalled)
}
func TestParamItem_CallbackWithContext(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
assert.NotNil(t, ctx)
return nil
}
param.RegisterCallback(callback)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
assert.True(t, callbackCalled)
}
func TestParamItem_CallbackCleanup(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
return nil
}
param.RegisterCallback(callback)
param.UnregisterCallback()
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
assert.False(t, callbackCalled)
}
func TestParamItem_ManagerNil(t *testing.T) {
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
callback := func(ctx context.Context, key, oldValue, newValue string) error {
return nil
}
param.RegisterCallback(callback)
assert.NotNil(t, param.callback)
param.UnregisterCallback()
assert.Nil(t, param.callback)
}
func TestParamItem_DispatcherNil(t *testing.T) {
manager := config.NewManager()
manager.Dispatcher = nil
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
callback := func(ctx context.Context, key, oldValue, newValue string) error {
return nil
}
param.RegisterCallback(callback)
assert.NotNil(t, param.callback)
}
func TestParamItem_StressTest(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCount int
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCount++
return nil
}
param.RegisterCallback(callback)
for i := 0; i < 10; i++ {
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: fmt.Sprintf("value-%d", i),
}
param.handleConfigChange(event)
}
assert.Equal(t, 10, callbackCount)
}
func TestParamItem_FormatterWithCallback(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "100",
Formatter: func(v string) string {
if v == "100" {
return "formatted-100"
}
return v
},
}
param.Init(manager)
var callbackCalled bool
var callbackNewValue string
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
callbackNewValue = newValue
return nil
}
param.RegisterCallback(callback)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "200",
}
param.handleConfigChange(event)
assert.True(t, callbackCalled)
assert.Equal(t, "200", callbackNewValue)
}
func TestParamItem_InitWithManager(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
assert.Nil(t, param.manager)
param.Init(manager)
assert.NotNil(t, param.manager)
assert.Equal(t, manager, param.manager)
assert.NotNil(t, manager.Dispatcher)
}
func TestParamItem_LastValueTracking_Change(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
initialValue := param.GetValue()
assert.Equal(t, "default", initialValue)
lastVal := param.lastValue.Load()
assert.NotNil(t, lastVal)
assert.Equal(t, "default", *lastVal)
var callbackNewValue string
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackNewValue = newValue
return nil
}
param.RegisterCallback(callback)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
lastVal = param.lastValue.Load()
assert.NotNil(t, lastVal)
assert.Equal(t, "new-value", *lastVal)
assert.Equal(t, "new-value", callbackNewValue)
}
func TestParamItem_LastValueTracking_NoChange(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
initialValue := param.GetValue()
assert.Equal(t, "default", initialValue)
lastVal := param.lastValue.Load()
assert.NotNil(t, lastVal)
assert.Equal(t, "default", *lastVal)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(event)
lastVal = param.lastValue.Load()
assert.NotNil(t, lastVal)
assert.Equal(t, "default", *lastVal)
}
func TestParamItem_EventTypeFiltering(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
var callbackCalled bool
callback := func(ctx context.Context, key, oldValue, newValue string) error {
callbackCalled = true
return nil
}
param.RegisterCallback(callback)
createEvent := &config.Event{
EventType: config.CreateType,
Key: "test.param",
Value: "new-value",
}
param.handleConfigChange(createEvent)
assert.True(t, callbackCalled)
callbackCalled = false
updateEvent := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "another-value",
}
param.handleConfigChange(updateEvent)
assert.True(t, callbackCalled)
}
func TestParamItem_NilCallback(t *testing.T) {
manager := config.NewManager()
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
param.Init(manager)
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
assert.NotPanics(t, func() {
param.handleConfigChange(event)
})
}
func TestParamItem_CallbackNilManager(t *testing.T) {
param := &ParamItem{
Key: "test.param",
DefaultValue: "default",
}
event := &config.Event{
EventType: config.UpdateType,
Key: "test.param",
Value: "new-value",
}
assert.NotPanics(t, func() {
param.handleConfigChange(event)
})
}

View File

@ -0,0 +1,71 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paramtable
/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include "common/init_c.h"
*/
import "C"
import "unsafe"
func UpdateLogLevel(level string) error {
cvalue := C.CString(level)
C.SetLogLevel(cvalue)
C.free(unsafe.Pointer(cvalue))
return nil
}
func UpdateIndexSliceSize(size int) {
C.SetIndexSliceSize(C.int64_t(size))
}
func UpdateHighPriorityThreadCoreCoefficient(coefficient float64) {
C.SetHighPriorityThreadCoreCoefficient(C.float(coefficient))
}
func UpdateMiddlePriorityThreadCoreCoefficient(coefficient float64) {
C.SetMiddlePriorityThreadCoreCoefficient(C.float(coefficient))
}
func UpdateLowPriorityThreadCoreCoefficient(coefficient float64) {
C.SetLowPriorityThreadCoreCoefficient(C.float(coefficient))
}
func UpdateDefaultExprEvalBatchSize(size int) {
C.SetDefaultExprEvalBatchSize(C.int64_t(size))
}
func UpdateDefaultOptimizeExprEnable(enable bool) {
C.SetDefaultOptimizeExprEnable(C.bool(enable))
}
func UpdateDefaultJSONKeyStatsCommitInterval(interval int) {
C.SetDefaultJSONKeyStatsCommitInterval(C.int64_t(interval))
}
func UpdateDefaultGrowingJSONKeyStatsEnable(enable bool) {
C.SetDefaultGrowingJSONKeyStatsEnable(C.bool(enable))
}
func UpdateDefaultConfigParamTypeCheck(enable bool) {
C.SetDefaultConfigParamTypeCheck(C.bool(enable))
}