diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp index 74169d97d5..9d68be97cb 100644 --- a/internal/core/src/common/Common.cpp +++ b/internal/core/src/common/Common.cpp @@ -19,83 +19,104 @@ namespace milvus { -int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE; -float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = - DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; -float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = - DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; -float LOW_PRIORITY_THREAD_CORE_COEFFICIENT = - DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT; int CPU_NUM = DEFAULT_CPU_NUM; -int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE; -bool OPTIMIZE_EXPR_ENABLED = DEFAULT_OPTIMIZE_EXPR_ENABLED; -int64_t JSON_KEY_STATS_COMMIT_INTERVAL = DEFAULT_JSON_KEY_STATS_COMMIT_INTERVAL; -bool GROWING_JSON_KEY_STATS_ENABLED = DEFAULT_GROWING_JSON_KEY_STATS_ENABLED; -bool CONFIG_PARAM_TYPE_CHECK_ENABLED = DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED; +std::atomic FILE_SLICE_SIZE(DEFAULT_INDEX_FILE_SLICE_SIZE); +std::atomic HIGH_PRIORITY_THREAD_CORE_COEFFICIENT( + DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); +std::atomic MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT( + DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); +std::atomic LOW_PRIORITY_THREAD_CORE_COEFFICIENT( + DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT); +std::atomic EXEC_EVAL_EXPR_BATCH_SIZE( + DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE); +std::atomic OPTIMIZE_EXPR_ENABLED(DEFAULT_OPTIMIZE_EXPR_ENABLED); + +std::atomic JSON_KEY_STATS_COMMIT_INTERVAL( + DEFAULT_JSON_KEY_STATS_COMMIT_INTERVAL); +std::atomic GROWING_JSON_KEY_STATS_ENABLED( + DEFAULT_GROWING_JSON_KEY_STATS_ENABLED); +std::atomic CONFIG_PARAM_TYPE_CHECK_ENABLED( + DEFAULT_CONFIG_PARAM_TYPE_CHECK_ENABLED); + +void +InitCpuNum(const int num) { + CPU_NUM = num; + LOG_INFO("set cpu num: {}", CPU_NUM); +} void SetIndexSliceSize(const int64_t size) { - FILE_SLICE_SIZE = size << 20; - LOG_INFO("set config index slice size (byte): {}", FILE_SLICE_SIZE); + FILE_SLICE_SIZE.store(size << 20); + LOG_INFO("set config index slice size (byte): {}", FILE_SLICE_SIZE.load()); } void SetHighPriorityThreadCoreCoefficient(const float coefficient) { - HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; + HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient); LOG_INFO("set high priority thread pool core coefficient: {}", - HIGH_PRIORITY_THREAD_CORE_COEFFICIENT); + HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.load()); } void SetMiddlePriorityThreadCoreCoefficient(const float coefficient) { - MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; + MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient); LOG_INFO("set middle priority thread pool core coefficient: {}", - MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT); + MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.load()); } void SetLowPriorityThreadCoreCoefficient(const float coefficient) { - LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient; + LOW_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient); LOG_INFO("set low priority thread pool core coefficient: {}", - LOW_PRIORITY_THREAD_CORE_COEFFICIENT); + LOW_PRIORITY_THREAD_CORE_COEFFICIENT.load()); } void SetDefaultExecEvalExprBatchSize(int64_t val) { - EXEC_EVAL_EXPR_BATCH_SIZE = val; - LOG_INFO("set default expr eval batch size: {}", EXEC_EVAL_EXPR_BATCH_SIZE); -} - -void -SetCpuNum(const int num) { - CPU_NUM = num; + EXEC_EVAL_EXPR_BATCH_SIZE.store(val); + LOG_INFO("set default expr eval batch size: {}", + EXEC_EVAL_EXPR_BATCH_SIZE.load()); } void SetDefaultOptimizeExprEnable(bool val) { - OPTIMIZE_EXPR_ENABLED = val; - LOG_INFO("set default optimize expr enabled: {}", OPTIMIZE_EXPR_ENABLED); + OPTIMIZE_EXPR_ENABLED.store(val); + LOG_INFO("set default optimize expr enabled: {}", + OPTIMIZE_EXPR_ENABLED.load()); } void SetDefaultJSONKeyStatsCommitInterval(int64_t val) { - JSON_KEY_STATS_COMMIT_INTERVAL = val; + JSON_KEY_STATS_COMMIT_INTERVAL.store(val); LOG_INFO("set default json key Stats commit interval: {}", - JSON_KEY_STATS_COMMIT_INTERVAL); + JSON_KEY_STATS_COMMIT_INTERVAL.load()); } void SetDefaultGrowingJSONKeyStatsEnable(bool val) { - GROWING_JSON_KEY_STATS_ENABLED = val; + GROWING_JSON_KEY_STATS_ENABLED.store(val); LOG_INFO("set default growing json key index enable: {}", - GROWING_JSON_KEY_STATS_ENABLED); + GROWING_JSON_KEY_STATS_ENABLED.load()); } void SetDefaultConfigParamTypeCheck(bool val) { - CONFIG_PARAM_TYPE_CHECK_ENABLED = val; + CONFIG_PARAM_TYPE_CHECK_ENABLED.store(val); LOG_INFO("set default config param type check enabled: {}", - CONFIG_PARAM_TYPE_CHECK_ENABLED); + CONFIG_PARAM_TYPE_CHECK_ENABLED.load()); } + +void +SetLogLevel(const char* level) { + LOG_INFO("set log level: {}", level); + if (strcmp(level, "debug") == 0) { + FLAGS_v = 5; + } else if (strcmp(level, "trace") == 0) { + FLAGS_v = 6; + } else { + FLAGS_v = 4; + } +} + } // namespace milvus diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index b3501f06f4..698fbfda42 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -23,16 +24,20 @@ namespace milvus { -extern int64_t FILE_SLICE_SIZE; -extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; -extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; -extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; -extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; -extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL; -extern bool OPTIMIZE_EXPR_ENABLED; -extern bool GROWING_JSON_KEY_STATS_ENABLED; -extern bool CONFIG_PARAM_TYPE_CHECK_ENABLED; + +extern std::atomic FILE_SLICE_SIZE; +extern std::atomic HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; +extern std::atomic MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; +extern std::atomic LOW_PRIORITY_THREAD_CORE_COEFFICIENT; +extern std::atomic EXEC_EVAL_EXPR_BATCH_SIZE; +extern std::atomic JSON_KEY_STATS_COMMIT_INTERVAL; +extern std::atomic OPTIMIZE_EXPR_ENABLED; +extern std::atomic GROWING_JSON_KEY_STATS_ENABLED; +extern std::atomic CONFIG_PARAM_TYPE_CHECK_ENABLED; + +void +InitCpuNum(const int core); void SetIndexSliceSize(const int64_t size); @@ -46,9 +51,6 @@ SetMiddlePriorityThreadCoreCoefficient(const float coefficient); void SetLowPriorityThreadCoreCoefficient(const float coefficient); -void -SetCpuNum(const int core); - void SetDefaultExecEvalExprBatchSize(int64_t val); @@ -64,6 +66,9 @@ SetDefaultGrowingJSONKeyStatsEnable(bool val); void SetDefaultConfigParamTypeCheck(bool val); +void +SetLogLevel(const char* level); + struct BufferView { struct Element { const char* data_; diff --git a/internal/core/src/common/Slice.cpp b/internal/core/src/common/Slice.cpp index 362615c9ef..b46e6bcb86 100644 --- a/internal/core/src/common/Slice.cpp +++ b/internal/core/src/common/Slice.cpp @@ -90,13 +90,17 @@ Disassemble(BinarySet& binarySet) { std::vector slice_key_list; for (auto& kv : binarySet.binary_map_) { - if (kv.second->size > FILE_SLICE_SIZE) { + if (kv.second->size > FILE_SLICE_SIZE.load()) { slice_key_list.push_back(kv.first); } } for (auto& key : slice_key_list) { Config slice_i; - Slice(key, binarySet.Erase(key), FILE_SLICE_SIZE, binarySet, slice_i); + Slice(key, + binarySet.Erase(key), + FILE_SLICE_SIZE.load(), + binarySet, + slice_i); meta_info[META].emplace_back(slice_i); } if (!slice_key_list.empty()) { diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index e0db9f3353..f948a6fc43 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -19,88 +19,62 @@ #include "common/Common.h" #include "common/Tracer.h" -std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9, - flag10; std::once_flag traceFlag; - -void -InitIndexSliceSize(const int64_t size) { - std::call_once( - flag1, [](int64_t size) { milvus::SetIndexSliceSize(size); }, size); -} - -void -InitHighPriorityThreadCoreCoefficient(const float value) { - std::call_once( - flag2, - [](float value) { - milvus::SetHighPriorityThreadCoreCoefficient(value); - }, - value); -} - -void -InitMiddlePriorityThreadCoreCoefficient(const float value) { - std::call_once( - flag4, - [](float value) { - milvus::SetMiddlePriorityThreadCoreCoefficient(value); - }, - value); -} - -void -InitLowPriorityThreadCoreCoefficient(const float value) { - std::call_once( - flag5, - [](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); }, - value); -} +std::once_flag cpuNumFlag; void InitCpuNum(const int value) { - std::call_once( - flag3, [](int value) { milvus::SetCpuNum(value); }, value); + std::call_once(cpuNumFlag, [value]() { milvus::InitCpuNum(value); }); } void -InitDefaultExprEvalBatchSize(int64_t val) { - std::call_once( - flag6, - [](int val) { milvus::SetDefaultExecEvalExprBatchSize(val); }, - val); +SetIndexSliceSize(const int64_t size) { + milvus::SetIndexSliceSize(size); } void -InitDefaultOptimizeExprEnable(bool val) { - std::call_once( - flag7, - [](bool val) { milvus::SetDefaultOptimizeExprEnable(val); }, - val); +SetHighPriorityThreadCoreCoefficient(const float value) { + milvus::SetHighPriorityThreadCoreCoefficient(value); } void -InitDefaultJSONKeyStatsCommitInterval(int64_t val) { - std::call_once( - flag8, - [](int val) { milvus::SetDefaultJSONKeyStatsCommitInterval(val); }, - val); +SetMiddlePriorityThreadCoreCoefficient(const float value) { + milvus::SetMiddlePriorityThreadCoreCoefficient(value); } void -InitDefaultGrowingJSONKeyStatsEnable(bool val) { - std::call_once( - flag9, - [](bool val) { milvus::SetDefaultGrowingJSONKeyStatsEnable(val); }, - val); +SetLowPriorityThreadCoreCoefficient(const float value) { + milvus::SetLowPriorityThreadCoreCoefficient(value); } void -InitDefaultConfigParamTypeCheck(bool val) { - std::call_once( - flag10, - [](bool val) { milvus::SetDefaultConfigParamTypeCheck(val); }, - val); +SetDefaultExprEvalBatchSize(int64_t val) { + milvus::SetDefaultExecEvalExprBatchSize(val); +} + +void +SetDefaultOptimizeExprEnable(bool val) { + milvus::SetDefaultOptimizeExprEnable(val); +} + +void +SetDefaultJSONKeyStatsCommitInterval(int64_t val) { + milvus::SetDefaultJSONKeyStatsCommitInterval(val); +} + +void +SetDefaultGrowingJSONKeyStatsEnable(bool val) { + milvus::SetDefaultGrowingJSONKeyStatsEnable(val); +} + +void +SetDefaultConfigParamTypeCheck(bool val) { + milvus::SetDefaultConfigParamTypeCheck(val); +} + +void +SetLogLevel(const char* level) { + milvus::SetLogLevel(level); } void diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index c4223d34d7..48c9b45c97 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -24,42 +24,46 @@ extern "C" { #include #include "common/type_c.h" -void -InitIndexSliceSize(const int64_t); - -void -InitHighPriorityThreadCoreCoefficient(const float); - -void -InitMiddlePriorityThreadCoreCoefficient(const float); - -void -InitLowPriorityThreadCoreCoefficient(const float); - -void -InitDefaultExprEvalBatchSize(int64_t val); - void InitCpuNum(const int); +void +SetIndexSliceSize(const int64_t); + +void +SetHighPriorityThreadCoreCoefficient(const float); + +void +SetMiddlePriorityThreadCoreCoefficient(const float); + +void +SetLowPriorityThreadCoreCoefficient(const float); + +void +SetDefaultExprEvalBatchSize(int64_t val); + +void +SetDefaultOptimizeExprEnable(bool val); + +void +SetDefaultJSONKeyStatsCommitInterval(int64_t val); + +void +SetDefaultGrowingJSONKeyStatsEnable(bool val); + +void +SetDefaultConfigParamTypeCheck(bool val); + +// dynamic update segcore params +void +SetLogLevel(const char* level); + void InitTrace(CTraceConfig* config); void SetTrace(CTraceConfig* config); -void -InitDefaultOptimizeExprEnable(bool val); - -void -InitDefaultJSONKeyStatsCommitInterval(int64_t val); - -void -InitDefaultGrowingJSONKeyStatsEnable(bool val); - -void -InitDefaultConfigParamTypeCheck(bool val); - #ifdef __cplusplus }; #endif diff --git a/internal/core/src/exec/QueryContext.h b/internal/core/src/exec/QueryContext.h index f97a19dd4d..15a022525a 100644 --- a/internal/core/src/exec/QueryContext.h +++ b/internal/core/src/exec/QueryContext.h @@ -133,7 +133,7 @@ class QueryConfig : public MemConfig { int64_t get_expr_batch_size() const { return BaseConfig::Get(kExprEvalBatchSize, - EXEC_EVAL_EXPR_BATCH_SIZE); + EXEC_EVAL_EXPR_BATCH_SIZE.load()); } }; diff --git a/internal/core/src/exec/expression/Expr.cpp b/internal/core/src/exec/expression/Expr.cpp index ce1ac57fdc..99220c6413 100644 --- a/internal/core/src/exec/expression/Expr.cpp +++ b/internal/core/src/exec/expression/Expr.cpp @@ -68,7 +68,7 @@ CompileExpressions(const std::vector& sources, enable_constant_folding)); } - if (OPTIMIZE_EXPR_ENABLED) { + if (OPTIMIZE_EXPR_ENABLED.load()) { OptimizeCompiledExprs(context, exprs); } diff --git a/internal/core/src/segcore/memory_planner.cpp b/internal/core/src/segcore/memory_planner.cpp index 039445c66c..448b90d5b6 100644 --- a/internal/core/src/segcore/memory_planner.cpp +++ b/internal/core/src/segcore/memory_planner.cpp @@ -180,7 +180,7 @@ LoadWithStrategy(const std::vector& remote_files, futures.reserve(blocks.size()); auto reader_memory_limit = std::max( - memory_limit / blocks.size(), FILE_SLICE_SIZE); + memory_limit / blocks.size(), FILE_SLICE_SIZE.load()); for (const auto& block : blocks) { futures.emplace_back(pool.Submit([block, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 6eedc8320c..c658049601 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -128,7 +128,8 @@ DiskFileManagerImpl::AddFileInternal( local_file_offsets.clear(); } - auto batch_size = std::min(FILE_SLICE_SIZE, int64_t(fileSize) - offset); + auto batch_size = + std::min(FILE_SLICE_SIZE.load(), int64_t(fileSize) - offset); batch_remote_files.emplace_back(get_remote_path(fileName, slice_num)); remote_file_sizes.emplace_back(batch_size); local_file_offsets.emplace_back(offset); @@ -258,7 +259,7 @@ DiskFileManagerImpl::CacheIndexToDiskInternal( batch_remote_files.reserve(slices.second.size()); uint64_t max_parallel_degree = - uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE.load()); { auto file_writer = storage::FileWriter(local_index_file_name); diff --git a/internal/core/src/storage/ThreadPools.cpp b/internal/core/src/storage/ThreadPools.cpp index f0b6513726..81cd78b54a 100644 --- a/internal/core/src/storage/ThreadPools.cpp +++ b/internal/core/src/storage/ThreadPools.cpp @@ -41,13 +41,13 @@ ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) { float coefficient = 1.0; switch (priority) { case milvus::ThreadPoolPriority::HIGH: - coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT; + coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.load(); break; case milvus::ThreadPoolPriority::MIDDLE: - coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; + coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.load(); break; default: - coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT; + coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT.load(); break; } std::string name = name_map()[priority]; diff --git a/internal/core/unittest/test_exec.cpp b/internal/core/unittest/test_exec.cpp index a3b200bb85..d18a5677cb 100644 --- a/internal/core/unittest/test_exec.cpp +++ b/internal/core/unittest/test_exec.cpp @@ -679,7 +679,7 @@ TEST_P(TaskTest, Test_reorder) { auto query_context = std::make_shared( DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); ExecContext context(query_context.get()); - OPTIMIZE_EXPR_ENABLED = false; + OPTIMIZE_EXPR_ENABLED.store(false); auto exprs = milvus::exec::CompileExpressions({expr3}, &context, {}, false); EXPECT_EQ(exprs.size(), 1); @@ -690,7 +690,7 @@ TEST_P(TaskTest, Test_reorder) { std::cout << phy_expr->ToString() << std::endl; auto reorder = phy_expr->GetReorder(); EXPECT_EQ(reorder.size(), 0); - OPTIMIZE_EXPR_ENABLED = true; + OPTIMIZE_EXPR_ENABLED.store(true, std::memory_order_release); } } diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index d1c505d2ef..ab3a053661 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -4234,7 +4234,7 @@ TEST_P(ExprTest, TestMutiInConvert) { auto plan = std::make_shared(DEFAULT_PLANNODE_ID, expr); auto final1 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); - OPTIMIZE_EXPR_ENABLED = false; + OPTIMIZE_EXPR_ENABLED.store(false); auto final2 = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); EXPECT_EQ(final1.size(), final2.size()); for (auto i = 0; i < final1.size(); i++) { @@ -5071,7 +5071,7 @@ TEST_P(ExprTest, TestGrowingSegmentGetBatchSize) { 8192, 10240, 20480, 30720, 40960, 102400, 204800, 307200}; for (const auto& batch_size : test_batch_size) { - EXEC_EVAL_EXPR_BATCH_SIZE = batch_size; + EXEC_EVAL_EXPR_BATCH_SIZE.store(batch_size); auto plan = plan::PlanFragment(plan_node); auto query_context = std::make_shared( "query id", seg.get(), N, MAX_TIMESTAMP); diff --git a/internal/datanode/index/init_segcore.go b/internal/datanode/index/init_segcore.go index fa263507b6..a6e634c9b5 100644 --- a/internal/datanode/index/init_segcore.go +++ b/internal/datanode/index/init_segcore.go @@ -50,15 +50,15 @@ func InitSegcore() { // override segcore index slice size cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) - C.InitIndexSliceSize(cIndexSliceSize) + C.SetIndexSliceSize(cIndexSliceSize) // set up thread pool for different priorities cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) - C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) + C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) - C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) + C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) - C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) + C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) cCPUNum := C.int(hardware.GetCPUNum()) C.InitCpuNum(cCPUNum) diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 2ca32fc507..553850c7b8 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -250,15 +250,15 @@ func (node *QueryNode) InitSegcore() error { // override segcore index slice size cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) - C.InitIndexSliceSize(cIndexSliceSize) + C.SetIndexSliceSize(cIndexSliceSize) // set up thread pool for different priorities cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) - C.InitHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) + C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) - C.InitMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) + C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) - C.InitLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) + C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) node.RegisterSegcoreConfigWatcher() @@ -274,22 +274,22 @@ func (node *QueryNode) InitSegcore() error { C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize) cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64()) - C.InitDefaultExprEvalBatchSize(cExprBatchSize) + C.SetDefaultExprEvalBatchSize(cExprBatchSize) cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool()) - C.InitDefaultOptimizeExprEnable(cOptimizeExprEnabled) + C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled) cJSONKeyStatsCommitInterval := C.int64_t(paramtable.Get().QueryNodeCfg.JSONKeyStatsCommitInterval.GetAsInt64()) - C.InitDefaultJSONKeyStatsCommitInterval(cJSONKeyStatsCommitInterval) + C.SetDefaultJSONKeyStatsCommitInterval(cJSONKeyStatsCommitInterval) cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool()) - C.InitDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled) + C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled) cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool()) - C.InitDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck) + C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck) localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) initcore.InitLocalChunkManager(localDataRootPath) diff --git a/pkg/util/paramtable/autoindex_param_test.go b/pkg/util/paramtable/autoindex_param_test.go index 425336b563..f625a01770 100644 --- a/pkg/util/paramtable/autoindex_param_test.go +++ b/pkg/util/paramtable/autoindex_param_test.go @@ -138,10 +138,9 @@ func Test_autoIndexConfig_panicIfNotValid(t *testing.T) { Formatter: GetBuildParamFormatter(FloatVectorDefaultMetricType, "autoIndex.params.build"), }, } - p.IndexParams.Init(mgr) assert.Panics(t, func() { - p.IndexParams.GetAsJSONMap() + p.IndexParams.Init(mgr) }) }) } diff --git a/pkg/util/paramtable/base_table.go b/pkg/util/paramtable/base_table.go index 9f46d8a3f5..09d7ec820a 100644 --- a/pkg/util/paramtable/base_table.go +++ b/pkg/util/paramtable/base_table.go @@ -143,7 +143,14 @@ func (bt *BaseTable) init() { ret = strings.ReplaceAll(ret, ".", "") return ret } - bt.mgr, _ = config.Init() + + var err error + bt.mgr, err = config.Init() + if err != nil { + log.Error("failed to initialize config manager", zap.Error(err)) + panic(err) + } + if !bt.config.skipEnv { err := bt.mgr.AddSource(config.NewEnvSource(formatter)) if err != nil { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6f6792d088..450c080bcf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -17,6 +17,7 @@ package paramtable import ( + "context" "fmt" "os" "path" @@ -530,6 +531,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.IndexSliceSize.Init(base.mgr) + p.IndexSliceSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + size, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateIndexSliceSize(size) + return nil + }) p.EnableMaterializedView = ParamItem{ Key: "common.materializedView.enabled", @@ -631,6 +640,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.HighPriorityThreadCoreCoefficient.Init(base.mgr) + p.HighPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateHighPriorityThreadCoreCoefficient(coefficient) + return nil + }) p.MiddlePriorityThreadCoreCoefficient = ParamItem{ Key: "common.threadCoreCoefficient.middlePriority", @@ -641,6 +658,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.MiddlePriorityThreadCoreCoefficient.Init(base.mgr) + p.MiddlePriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateMiddlePriorityThreadCoreCoefficient(coefficient) + return nil + }) p.LowPriorityThreadCoreCoefficient = ParamItem{ Key: "common.threadCoreCoefficient.lowPriority", @@ -651,6 +676,14 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.LowPriorityThreadCoreCoefficient.Init(base.mgr) + p.LowPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateLowPriorityThreadCoreCoefficient(coefficient) + return nil + }) p.DiskWriteMode = ParamItem{ Key: "common.diskWriteMode", @@ -1064,6 +1097,15 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnabledOptimizeExpr.Init(base.mgr) + p.EnabledOptimizeExpr.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultOptimizeExprEnable(enable) + return nil + }) + p.EnabledJSONKeyStats = ParamItem{ Key: "common.enabledJSONKeyStats", Version: "2.5.5", @@ -1081,6 +1123,14 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnabledGrowingSegmentJSONKeyStats.Init(base.mgr) + p.EnabledGrowingSegmentJSONKeyStats.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultGrowingJSONKeyStatsEnable(enable) + return nil + }) p.EnableConfigParamTypeCheck = ParamItem{ Key: "common.enableConfigParamTypeCheck", @@ -1090,6 +1140,14 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnableConfigParamTypeCheck.Init(base.mgr) + p.EnableConfigParamTypeCheck.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultConfigParamTypeCheck(enable) + return nil + }) } type gpuConfig struct { @@ -1330,6 +1388,9 @@ It is recommended to use debug level under test and development environments, an Export: true, } l.Level.Init(base.mgr) + l.Level.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + return UpdateLogLevel(newValue) + }) l.RootPath = ParamItem{ Key: "log.file.rootPath", @@ -3223,6 +3284,15 @@ If set to 0, time based eviction is disabled.`, } p.CacheCellUnaccessedSurvivalTime.Init(base.mgr) + p.EnableDisk = ParamItem{ + Key: "queryNode.enableDisk", + Version: "2.2.0", + DefaultValue: "false", + Doc: "enable querynode load disk index, and search on disk index", + Export: true, + } + p.EnableDisk.Init(base.mgr) + p.KnowhereThreadPoolSize = ParamItem{ Key: "queryNode.segcore.knowhereThreadPoolNumRatio", Version: "2.0.0", @@ -3671,15 +3741,6 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.CPURatio.Init(base.mgr) - p.EnableDisk = ParamItem{ - Key: "queryNode.enableDisk", - Version: "2.2.0", - DefaultValue: "false", - Doc: "enable querynode load disk index, and search on disk index", - Export: true, - } - p.EnableDisk.Init(base.mgr) - p.IndexOffsetCacheEnabled = ParamItem{ Key: "queryNode.indexOffsetCacheEnabled", Version: "2.5.0", @@ -3882,6 +3943,14 @@ user-task-polling: Doc: "expr eval batch size for getnext interface", } p.ExprEvalBatchSize.Init(base.mgr) + p.ExprEvalBatchSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + size, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateDefaultExprEvalBatchSize(size) + return nil + }) p.JSONKeyStatsCommitInterval = ParamItem{ Key: "queryNode.segcore.jsonKeyStatsCommitInterval", @@ -3891,6 +3960,14 @@ user-task-polling: Export: true, } p.JSONKeyStatsCommitInterval.Init(base.mgr) + p.JSONKeyStatsCommitInterval.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + interval, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateDefaultJSONKeyStatsCommitInterval(interval) + return nil + }) p.CleanExcludeSegInterval = ParamItem{ Key: "queryCoord.cleanExcludeSegmentInterval", diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index c98a92b3fd..b65bbdb389 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -17,6 +17,7 @@ package paramtable import ( + "context" "fmt" "strconv" "strings" @@ -24,11 +25,15 @@ import ( "github.com/samber/lo" "go.uber.org/atomic" + "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/config" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" ) +type ParamChangeCallback func(ctx context.Context, key, oldValue, newValue string) error + type ParamItem struct { Key string // which should be named as "A.B.C" Version string @@ -45,6 +50,9 @@ type ParamItem struct { // for unittest. tempValue atomic.Pointer[string] + + callback ParamChangeCallback + lastValue atomic.Pointer[string] } func (pi *ParamItem) Init(manager *config.Manager) { @@ -52,6 +60,58 @@ func (pi *ParamItem) Init(manager *config.Manager) { if pi.Forbidden { pi.manager.ForbidUpdate(pi.Key) } + + currentValue := pi.GetValue() + pi.lastValue.Store(¤tValue) + + if manager != nil && manager.Dispatcher != nil { + handler := config.NewHandler(pi.Key, func(event *config.Event) { + if event.Key == strings.ToLower(pi.Key) && event.EventType == config.UpdateType { + pi.handleConfigChange(event) + } + }) + manager.Dispatcher.Register(pi.Key, handler) + } +} + +func (pi *ParamItem) RegisterCallback(callback ParamChangeCallback) { + pi.callback = callback +} + +func (pi *ParamItem) UnregisterCallback() { + pi.callback = nil +} + +func (pi *ParamItem) handleConfigChange(event *config.Event) { + if pi.callback == nil { + return + } + + oldValue := "" + if lastVal := pi.lastValue.Load(); lastVal != nil { + oldValue = *lastVal + } + + newValue := event.Value + + if oldValue == newValue { + return + } + + if err := pi.callback(context.Background(), pi.Key, oldValue, newValue); err != nil { + log.Error("param change callback failed", + zap.String("key", pi.Key), + zap.String("oldValue", oldValue), + zap.String("newValue", newValue), + zap.Error(err)) + } else { + log.Info("param value changed", + zap.String("key", pi.Key), + zap.String("oldValue", oldValue), + zap.String("newValue", newValue)) + } + + pi.lastValue.Store(&newValue) } // Get original value with error diff --git a/pkg/util/paramtable/param_item_callback_test.go b/pkg/util/paramtable/param_item_callback_test.go new file mode 100644 index 0000000000..439447c645 --- /dev/null +++ b/pkg/util/paramtable/param_item_callback_test.go @@ -0,0 +1,458 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package paramtable + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/config" +) + +func TestParamItem_RegisterCallback(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + return nil + } + + param.RegisterCallback(callback) + assert.NotNil(t, param.callback) + + param.UnregisterCallback() + assert.Nil(t, param.callback) +} + +func TestParamItem_CallbackTriggered(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + var capturedOldValue, capturedNewValue string + var capturedKey string + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + capturedKey = key + capturedOldValue = oldValue + capturedNewValue = newValue + return nil + } + + param.RegisterCallback(callback) + + assert.Equal(t, "default", param.GetValue()) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + assert.True(t, callbackCalled) + assert.Equal(t, "test.param", capturedKey) + assert.Equal(t, "default", capturedOldValue) + assert.Equal(t, "new-value", capturedNewValue) +} + +func TestParamItem_CallbackErrorHandling(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + + errorCallback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + return fmt.Errorf("callback error") + } + + param.RegisterCallback(errorCallback) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + assert.True(t, callbackCalled) +} + +func TestParamItem_NoValueChange(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + return nil + } + + param.RegisterCallback(callback) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "default", + } + + param.handleConfigChange(event) + + assert.False(t, callbackCalled) +} + +func TestParamItem_CallbackWithContext(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + assert.NotNil(t, ctx) + return nil + } + + param.RegisterCallback(callback) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + assert.True(t, callbackCalled) +} + +func TestParamItem_CallbackCleanup(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + return nil + } + + param.RegisterCallback(callback) + + param.UnregisterCallback() + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + assert.False(t, callbackCalled) +} + +func TestParamItem_ManagerNil(t *testing.T) { + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + return nil + } + + param.RegisterCallback(callback) + assert.NotNil(t, param.callback) + + param.UnregisterCallback() + assert.Nil(t, param.callback) +} + +func TestParamItem_DispatcherNil(t *testing.T) { + manager := config.NewManager() + manager.Dispatcher = nil + + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + + param.Init(manager) + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + return nil + } + + param.RegisterCallback(callback) + assert.NotNil(t, param.callback) +} + +func TestParamItem_StressTest(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCount int + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCount++ + return nil + } + + param.RegisterCallback(callback) + + for i := 0; i < 10; i++ { + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: fmt.Sprintf("value-%d", i), + } + param.handleConfigChange(event) + } + + assert.Equal(t, 10, callbackCount) +} + +func TestParamItem_FormatterWithCallback(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "100", + Formatter: func(v string) string { + if v == "100" { + return "formatted-100" + } + return v + }, + } + param.Init(manager) + + var callbackCalled bool + var callbackNewValue string + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + callbackNewValue = newValue + return nil + } + + param.RegisterCallback(callback) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "200", + } + + param.handleConfigChange(event) + + assert.True(t, callbackCalled) + assert.Equal(t, "200", callbackNewValue) +} + +func TestParamItem_InitWithManager(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + + assert.Nil(t, param.manager) + + param.Init(manager) + + assert.NotNil(t, param.manager) + assert.Equal(t, manager, param.manager) + + assert.NotNil(t, manager.Dispatcher) +} + +func TestParamItem_LastValueTracking_Change(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + initialValue := param.GetValue() + assert.Equal(t, "default", initialValue) + + lastVal := param.lastValue.Load() + assert.NotNil(t, lastVal) + assert.Equal(t, "default", *lastVal) + + var callbackNewValue string + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackNewValue = newValue + return nil + } + + param.RegisterCallback(callback) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + lastVal = param.lastValue.Load() + assert.NotNil(t, lastVal) + assert.Equal(t, "new-value", *lastVal) + assert.Equal(t, "new-value", callbackNewValue) +} + +func TestParamItem_LastValueTracking_NoChange(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + initialValue := param.GetValue() + assert.Equal(t, "default", initialValue) + + lastVal := param.lastValue.Load() + assert.NotNil(t, lastVal) + assert.Equal(t, "default", *lastVal) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(event) + + lastVal = param.lastValue.Load() + assert.NotNil(t, lastVal) + assert.Equal(t, "default", *lastVal) +} + +func TestParamItem_EventTypeFiltering(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + var callbackCalled bool + + callback := func(ctx context.Context, key, oldValue, newValue string) error { + callbackCalled = true + return nil + } + + param.RegisterCallback(callback) + + createEvent := &config.Event{ + EventType: config.CreateType, + Key: "test.param", + Value: "new-value", + } + + param.handleConfigChange(createEvent) + assert.True(t, callbackCalled) + + callbackCalled = false + + updateEvent := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "another-value", + } + + param.handleConfigChange(updateEvent) + assert.True(t, callbackCalled) +} + +func TestParamItem_NilCallback(t *testing.T) { + manager := config.NewManager() + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + param.Init(manager) + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + assert.NotPanics(t, func() { + param.handleConfigChange(event) + }) +} + +func TestParamItem_CallbackNilManager(t *testing.T) { + param := &ParamItem{ + Key: "test.param", + DefaultValue: "default", + } + + event := &config.Event{ + EventType: config.UpdateType, + Key: "test.param", + Value: "new-value", + } + + assert.NotPanics(t, func() { + param.handleConfigChange(event) + }) +} diff --git a/pkg/util/paramtable/util.go b/pkg/util/paramtable/util.go new file mode 100644 index 0000000000..b23f12f099 --- /dev/null +++ b/pkg/util/paramtable/util.go @@ -0,0 +1,71 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package paramtable + +/* +#cgo pkg-config: milvus_core + +#include +#include +#include +#include "common/init_c.h" +*/ +import "C" +import "unsafe" + +func UpdateLogLevel(level string) error { + cvalue := C.CString(level) + C.SetLogLevel(cvalue) + C.free(unsafe.Pointer(cvalue)) + return nil +} + +func UpdateIndexSliceSize(size int) { + C.SetIndexSliceSize(C.int64_t(size)) +} + +func UpdateHighPriorityThreadCoreCoefficient(coefficient float64) { + C.SetHighPriorityThreadCoreCoefficient(C.float(coefficient)) +} + +func UpdateMiddlePriorityThreadCoreCoefficient(coefficient float64) { + C.SetMiddlePriorityThreadCoreCoefficient(C.float(coefficient)) +} + +func UpdateLowPriorityThreadCoreCoefficient(coefficient float64) { + C.SetLowPriorityThreadCoreCoefficient(C.float(coefficient)) +} + +func UpdateDefaultExprEvalBatchSize(size int) { + C.SetDefaultExprEvalBatchSize(C.int64_t(size)) +} + +func UpdateDefaultOptimizeExprEnable(enable bool) { + C.SetDefaultOptimizeExprEnable(C.bool(enable)) +} + +func UpdateDefaultJSONKeyStatsCommitInterval(interval int) { + C.SetDefaultJSONKeyStatsCommitInterval(C.int64_t(interval)) +} + +func UpdateDefaultGrowingJSONKeyStatsEnable(enable bool) { + C.SetDefaultGrowingJSONKeyStatsEnable(C.bool(enable)) +} + +func UpdateDefaultConfigParamTypeCheck(enable bool) { + C.SetDefaultConfigParamTypeCheck(C.bool(enable)) +}