From f032044125b1f95e46569cecd45fc6b894f786c5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 13 Aug 2025 19:31:44 +0800 Subject: [PATCH] enhance: Refine segcore param change callback (#43838) Related to #43230 This PR - Move segcore setup function to `initcore` package to remove cgo dependency from pkg - Register core callback only for components depends on segcore - Rectify `UpdateLogLevel` implementation Signed-off-by: Congqi Xia --- internal/core/src/common/Common.cpp | 16 +++- internal/datanode/index/init_segcore.go | 6 ++ internal/querynodev2/server.go | 6 ++ internal/streamingnode/server/server.go | 3 + internal/util/initcore/init_core.go | 94 +++++++++++++++++++ .../util/initcore}/util.go | 10 +- pkg/util/paramtable/component_param.go | 76 --------------- 7 files changed, 130 insertions(+), 81 deletions(-) rename {pkg/util/paramtable => internal/util/initcore}/util.go (95%) diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp index ad1df562dd..8013ea8e2d 100644 --- a/internal/core/src/common/Common.cpp +++ b/internal/core/src/common/Common.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include "common/Common.h" +#include "gflags/gflags.h" #include "log/Log.h" namespace milvus { @@ -76,11 +77,20 @@ void SetLogLevel(const char* level) { LOG_INFO("set log level: {}", level); if (strcmp(level, "debug") == 0) { - FLAGS_v = 5; + gflags::SetCommandLineOption("minloglevel", "0"); + gflags::SetCommandLineOption("v", "5"); } else if (strcmp(level, "trace") == 0) { - FLAGS_v = 6; + gflags::SetCommandLineOption("minloglevel", "0"); + gflags::SetCommandLineOption("v", "6"); } else { - FLAGS_v = 4; + gflags::SetCommandLineOption("v", "4"); + if (strcmp(level, "info") == 0) { + gflags::SetCommandLineOption("minloglevel", "0"); + } else if (strcmp(level, "warn") == 0) { + gflags::SetCommandLineOption("minloglevel", "1"); + } else if (strcmp(level, "error") == 0) { + gflags::SetCommandLineOption("minloglevel", "2"); + } } } diff --git a/internal/datanode/index/init_segcore.go b/internal/datanode/index/init_segcore.go index a6e634c9b5..3566133d8a 100644 --- a/internal/datanode/index/init_segcore.go +++ b/internal/datanode/index/init_segcore.go @@ -43,6 +43,9 @@ func InitSegcore() { C.IndexBuilderInit(cGlogConf) C.free(unsafe.Pointer(cGlogConf)) + // update log level based on current setup + initcore.UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue()) + // override index builder SIMD type cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue()) C.IndexBuilderSetSimdType(cSimdType) @@ -78,6 +81,9 @@ func InitSegcore() { cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) + + // init paramtable change callback for core related config + initcore.SetupCoreConfigChangelCallback() } func CloseSegcore() { diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 553850c7b8..39c054e4cf 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -231,6 +231,9 @@ func (node *QueryNode) InitSegcore() error { C.SegcoreInit(cGlogConf) C.free(unsafe.Pointer(cGlogConf)) + // update log level based on current setup + initcore.UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue()) + // override segcore chunk size cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64()) C.SegcoreSetChunkRows(cChunkRows) @@ -412,6 +415,9 @@ func (node *QueryNode) InitSegcore() error { initcore.InitTraceConfig(paramtable.Get()) C.InitExecExpressionFunctionFactory() + + // init paramtable change callback for core related config + initcore.SetupCoreConfigChangelCallback() return nil } diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go index 1d65b4788b..ede73a7e1f 100644 --- a/internal/streamingnode/server/server.go +++ b/internal/streamingnode/server/server.go @@ -47,6 +47,9 @@ func (s *Server) init() { if err := initcore.InitStorageV2FileSystem(paramtable.Get()); err != nil { panic(fmt.Sprintf("unrecoverable error happens at init storage v2 file system, %+v", err)) } + + // init paramtable change callback for core related config + initcore.SetupCoreConfigChangelCallback() } // Stop stops the streamingnode server. diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index fdf6b10857..eded83bbc8 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -29,9 +29,12 @@ package initcore import "C" import ( + "context" "encoding/base64" "fmt" "path" + "strconv" + "sync" "time" "unsafe" @@ -281,6 +284,97 @@ func InitFileWriterConfig(params *paramtable.ComponentParam) error { return HandleCStatus(&status, "InitFileWriterConfig failed") } +var coreParamCallbackInitOnce sync.Once + +func SetupCoreConfigChangelCallback() { + coreParamCallbackInitOnce.Do(func() { + paramtable.Get().CommonCfg.IndexSliceSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + size, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateIndexSliceSize(size) + return nil + }) + + paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateHighPriorityThreadCoreCoefficient(coefficient) + return nil + }) + + paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateMiddlePriorityThreadCoreCoefficient(coefficient) + return nil + }) + + paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + coefficient, err := strconv.ParseFloat(newValue, 64) + if err != nil { + return err + } + UpdateLowPriorityThreadCoreCoefficient(coefficient) + return nil + }) + + paramtable.Get().CommonCfg.EnabledOptimizeExpr.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultOptimizeExprEnable(enable) + return nil + }) + + paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultGrowingJSONKeyStatsEnable(enable) + return nil + }) + + paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + enable, err := strconv.ParseBool(newValue) + if err != nil { + return err + } + UpdateDefaultConfigParamTypeCheck(enable) + return nil + }) + + paramtable.Get().LogCfg.Level.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + return UpdateLogLevel(newValue) + }) + + paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + size, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateDefaultExprEvalBatchSize(size) + return nil + }) + + paramtable.Get().QueryNodeCfg.JSONKeyStatsCommitInterval.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { + interval, err := strconv.Atoi(newValue) + if err != nil { + return err + } + UpdateDefaultJSONKeyStatsCommitInterval(interval) + return nil + }) + }) +} + func InitInterminIndexConfig(params *paramtable.ComponentParam) error { enableInterminIndex := C.bool(params.QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool()) C.SegcoreSetEnableInterminSegmentIndex(enableInterminIndex) diff --git a/pkg/util/paramtable/util.go b/internal/util/initcore/util.go similarity index 95% rename from pkg/util/paramtable/util.go rename to internal/util/initcore/util.go index b23f12f099..6f31bf76e5 100644 --- a/pkg/util/paramtable/util.go +++ b/internal/util/initcore/util.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package paramtable +package initcore /* #cgo pkg-config: milvus_core @@ -25,9 +25,15 @@ package paramtable #include "common/init_c.h" */ import "C" -import "unsafe" + +import ( + "strings" + "unsafe" +) func UpdateLogLevel(level string) error { + // always use lower case + level = strings.ToLower(level) cvalue := C.CString(level) C.SetLogLevel(cvalue) C.free(unsafe.Pointer(cvalue)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 450c080bcf..4605538a1b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -17,7 +17,6 @@ package paramtable import ( - "context" "fmt" "os" "path" @@ -531,14 +530,6 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.IndexSliceSize.Init(base.mgr) - p.IndexSliceSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - size, err := strconv.Atoi(newValue) - if err != nil { - return err - } - UpdateIndexSliceSize(size) - return nil - }) p.EnableMaterializedView = ParamItem{ Key: "common.materializedView.enabled", @@ -640,14 +631,6 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.HighPriorityThreadCoreCoefficient.Init(base.mgr) - p.HighPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - coefficient, err := strconv.ParseFloat(newValue, 64) - if err != nil { - return err - } - UpdateHighPriorityThreadCoreCoefficient(coefficient) - return nil - }) p.MiddlePriorityThreadCoreCoefficient = ParamItem{ Key: "common.threadCoreCoefficient.middlePriority", @@ -658,14 +641,6 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.MiddlePriorityThreadCoreCoefficient.Init(base.mgr) - p.MiddlePriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - coefficient, err := strconv.ParseFloat(newValue, 64) - if err != nil { - return err - } - UpdateMiddlePriorityThreadCoreCoefficient(coefficient) - return nil - }) p.LowPriorityThreadCoreCoefficient = ParamItem{ Key: "common.threadCoreCoefficient.lowPriority", @@ -676,14 +651,6 @@ This configuration is only used by querynode and indexnode, it selects CPU instr Export: true, } p.LowPriorityThreadCoreCoefficient.Init(base.mgr) - p.LowPriorityThreadCoreCoefficient.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - coefficient, err := strconv.ParseFloat(newValue, 64) - if err != nil { - return err - } - UpdateLowPriorityThreadCoreCoefficient(coefficient) - return nil - }) p.DiskWriteMode = ParamItem{ Key: "common.diskWriteMode", @@ -1097,14 +1064,6 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnabledOptimizeExpr.Init(base.mgr) - p.EnabledOptimizeExpr.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - enable, err := strconv.ParseBool(newValue) - if err != nil { - return err - } - UpdateDefaultOptimizeExprEnable(enable) - return nil - }) p.EnabledJSONKeyStats = ParamItem{ Key: "common.enabledJSONKeyStats", @@ -1123,14 +1082,6 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnabledGrowingSegmentJSONKeyStats.Init(base.mgr) - p.EnabledGrowingSegmentJSONKeyStats.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - enable, err := strconv.ParseBool(newValue) - if err != nil { - return err - } - UpdateDefaultGrowingJSONKeyStatsEnable(enable) - return nil - }) p.EnableConfigParamTypeCheck = ParamItem{ Key: "common.enableConfigParamTypeCheck", @@ -1140,14 +1091,6 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.EnableConfigParamTypeCheck.Init(base.mgr) - p.EnableConfigParamTypeCheck.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - enable, err := strconv.ParseBool(newValue) - if err != nil { - return err - } - UpdateDefaultConfigParamTypeCheck(enable) - return nil - }) } type gpuConfig struct { @@ -1388,9 +1331,6 @@ It is recommended to use debug level under test and development environments, an Export: true, } l.Level.Init(base.mgr) - l.Level.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - return UpdateLogLevel(newValue) - }) l.RootPath = ParamItem{ Key: "log.file.rootPath", @@ -3943,14 +3883,6 @@ user-task-polling: Doc: "expr eval batch size for getnext interface", } p.ExprEvalBatchSize.Init(base.mgr) - p.ExprEvalBatchSize.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - size, err := strconv.Atoi(newValue) - if err != nil { - return err - } - UpdateDefaultExprEvalBatchSize(size) - return nil - }) p.JSONKeyStatsCommitInterval = ParamItem{ Key: "queryNode.segcore.jsonKeyStatsCommitInterval", @@ -3960,14 +3892,6 @@ user-task-polling: Export: true, } p.JSONKeyStatsCommitInterval.Init(base.mgr) - p.JSONKeyStatsCommitInterval.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error { - interval, err := strconv.Atoi(newValue) - if err != nil { - return err - } - UpdateDefaultJSONKeyStatsCommitInterval(interval) - return nil - }) p.CleanExcludeSegInterval = ParamItem{ Key: "queryCoord.cleanExcludeSegmentInterval",