From b6b59bd22289ad8f8400fd80c7d2e8403abc848b Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 29 Sep 2025 10:17:04 +0800 Subject: [PATCH] fix: remove redundant initialization of storage v2 (#44597) issue: #44596 - querynode already init the storage v2 and segcore, so streamingnode should not do this again. - It also fix the gcp object storage access denied. Signed-off-by: chyezh --- internal/querynodev2/server.go | 130 +---------------- internal/streamingnode/server/server.go | 16 +-- internal/util/initcore/query_node.go | 180 ++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 139 deletions(-) create mode 100644 internal/util/initcore/query_node.go diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 8e5c04ec9f..0612d0e3e0 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -33,12 +33,10 @@ import ( "context" "fmt" "os" - "path" "plugin" "strings" "sync" "time" - "unsafe" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -60,7 +58,6 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/internal/util/initcore" - "github.com/milvus-io/milvus/internal/util/pathutil" "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/segcore" @@ -74,7 +71,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/expr" - "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" @@ -243,129 +239,6 @@ func (node *QueryNode) RegisterSegcoreConfigWatcher() { config.NewHandler("common.diskWriteRateLimiter.lowPriorityRatio", node.ReconfigDiskFileWriterParams)) } -// InitSegcore set init params of segCore, such as chunkRows, SIMD type... -func (node *QueryNode) InitSegcore() error { - cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) - C.SegcoreInit(cGlogConf) - C.free(unsafe.Pointer(cGlogConf)) - - // update log level based on current setup - initcore.UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue()) - - // override segcore chunk size - cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64()) - C.SegcoreSetChunkRows(cChunkRows) - - cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32()) - C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize) - - cKnowhereFetchThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.GetAsUint32()) - C.SegcoreSetKnowhereFetchThreadPoolNum(cKnowhereFetchThreadPoolSize) - - // override segcore SIMD type - cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue()) - C.SegcoreSetSimdType(cSimdType) - C.free(unsafe.Pointer(cSimdType)) - - enableKnowhereScoreConsistency := paramtable.Get().QueryNodeCfg.KnowhereScoreConsistency.GetAsBool() - if enableKnowhereScoreConsistency { - C.SegcoreEnableKnowhereScoreConsistency() - } - - // override segcore index slice size - cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) - C.SetIndexSliceSize(cIndexSliceSize) - - // set up thread pool for different priorities - cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) - C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) - cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) - C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) - cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) - C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) - - node.RegisterSegcoreConfigWatcher() - - cCPUNum := C.int(hardware.GetCPUNum()) - C.InitCpuNum(cCPUNum) - - knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum())) - if knowhereBuildPoolSize < uint32(1) { - knowhereBuildPoolSize = uint32(1) - } - log.Ctx(node.ctx).Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize)) - cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize) - C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize) - - cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64()) - C.SetDefaultExprEvalBatchSize(cExprBatchSize) - - cDeleteDumpBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.DeleteDumpBatchSize.GetAsInt64()) - C.SetDefaultDeleteDumpBatchSize(cDeleteDumpBatchSize) - - cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool()) - C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled) - - cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool()) - C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled) - - cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) - cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) - C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) - - cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool()) - C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck) - - cExprResCacheEnabled := C.bool(paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool()) - C.SetExprResCacheEnable(cExprResCacheEnabled) - - cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64()) - C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes) - - localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, node.GetNodeID()) - - initcore.InitLocalChunkManager(localDataRootPath) - - err := initcore.InitRemoteChunkManager(paramtable.Get()) - if err != nil { - return err - } - - err = initcore.InitDiskFileWriterConfig(paramtable.Get()) - if err != nil { - return err - } - - if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - err = initcore.InitStorageV2FileSystem(paramtable.Get()) - if err != nil { - return err - } - } - - err = initcore.InitMmapManager(paramtable.Get(), node.GetNodeID()) - if err != nil { - return err - } - - err = initcore.InitTieredStorage(paramtable.Get()) - if err != nil { - return err - } - - err = initcore.InitInterminIndexConfig(paramtable.Get()) - if err != nil { - return err - } - - initcore.InitTraceConfig(paramtable.Get()) - C.InitExecExpressionFunctionFactory() - - // init paramtable change callback for core related config - initcore.SetupCoreConfigChangelCallback() - return initcore.InitPluginLoader() -} - func getIndexEngineVersion() (minimal, current int32) { cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion() return int32(cMinimal), int32(cCurrent) @@ -482,12 +355,13 @@ func (node *QueryNode) Init() error { // init pipeline manager node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators) - err = node.InitSegcore() + err = initcore.InitQueryNode(node.ctx) if err != nil { log.Error("QueryNode init segcore failed", zap.Error(err)) initError = err return } + node.RegisterSegcoreConfigWatcher() log.Info("query node init successfully", zap.Int64("queryNodeID", node.GetNodeID()), diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go index 1371f7f826..971627da70 100644 --- a/internal/streamingnode/server/server.go +++ b/internal/streamingnode/server/server.go @@ -1,6 +1,7 @@ package server import ( + "context" "fmt" "google.golang.org/grpc" @@ -16,7 +17,6 @@ import ( _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/kafka" _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar" _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) // Server is the streamingnode server. @@ -41,17 +41,11 @@ func (s *Server) init() { // init all service. s.initService() + + log.Info("init query segcore...") + initcore.InitQueryNode(context.TODO()) + log.Info("streamingnode server initialized") - - // init storage v2 file system. - if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - if err := initcore.InitStorageV2FileSystem(paramtable.Get()); err != nil { - panic(fmt.Sprintf("unrecoverable error happens at init storage v2 file system, %+v", err)) - } - } - - // init paramtable change callback for core related config - initcore.SetupCoreConfigChangelCallback() } // Stop stops the streamingnode server. diff --git a/internal/util/initcore/query_node.go b/internal/util/initcore/query_node.go new file mode 100644 index 0000000000..da54872104 --- /dev/null +++ b/internal/util/initcore/query_node.go @@ -0,0 +1,180 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package initcore + +/* +#cgo pkg-config: milvus_core + +#include +#include +#include "common/init_c.h" +#include "segcore/segcore_init_c.h" +#include "storage/storage_c.h" +#include "segcore/arrow_fs_c.h" +#include "common/type_c.h" +#include "segcore/collection_c.h" +#include "segcore/segment_c.h" +#include "exec/expression/function/init_c.h" +*/ +import "C" + +import ( + "context" + "path" + "sync" + "unsafe" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/pathutil" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +var initQueryNodeOnce sync.Once + +// InitQueryNode initializes query node once. +func InitQueryNode(ctx context.Context) error { + var err error + initQueryNodeOnce.Do(func() { + err = doInitQueryNodeOnce(ctx) + }) + return err +} + +// doInitQueryNodeOnce initializes query node once. +func doInitQueryNodeOnce(ctx context.Context) error { + nodeID := paramtable.GetNodeID() + + cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf)) + C.SegcoreInit(cGlogConf) + C.free(unsafe.Pointer(cGlogConf)) + + // update log level based on current setup + UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue()) + + // override segcore chunk size + cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64()) + C.SegcoreSetChunkRows(cChunkRows) + + cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32()) + C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize) + + cKnowhereFetchThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.GetAsUint32()) + C.SegcoreSetKnowhereFetchThreadPoolNum(cKnowhereFetchThreadPoolSize) + + // override segcore SIMD type + cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue()) + C.SegcoreSetSimdType(cSimdType) + C.free(unsafe.Pointer(cSimdType)) + + enableKnowhereScoreConsistency := paramtable.Get().QueryNodeCfg.KnowhereScoreConsistency.GetAsBool() + if enableKnowhereScoreConsistency { + C.SegcoreEnableKnowhereScoreConsistency() + } + + // override segcore index slice size + cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64()) + C.SetIndexSliceSize(cIndexSliceSize) + + // set up thread pool for different priorities + cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat()) + C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient) + cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat()) + C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient) + cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()) + C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient) + + cCPUNum := C.int(hardware.GetCPUNum()) + C.InitCpuNum(cCPUNum) + + knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum())) + if knowhereBuildPoolSize < uint32(1) { + knowhereBuildPoolSize = uint32(1) + } + log.Ctx(ctx).Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize)) + cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize) + C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize) + + cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64()) + C.SetDefaultExprEvalBatchSize(cExprBatchSize) + + cDeleteDumpBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.DeleteDumpBatchSize.GetAsInt64()) + C.SetDefaultDeleteDumpBatchSize(cDeleteDumpBatchSize) + + cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool()) + C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled) + + cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool()) + C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled) + + cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) + cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) + C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) + + cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool()) + C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck) + + cExprResCacheEnabled := C.bool(paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool()) + C.SetExprResCacheEnable(cExprResCacheEnabled) + + cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64()) + C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes) + + localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, nodeID) + + InitLocalChunkManager(localDataRootPath) + + err := InitRemoteChunkManager(paramtable.Get()) + if err != nil { + return err + } + + err = InitDiskFileWriterConfig(paramtable.Get()) + if err != nil { + return err + } + + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + err = InitStorageV2FileSystem(paramtable.Get()) + if err != nil { + return err + } + } + + err = InitMmapManager(paramtable.Get(), nodeID) + if err != nil { + return err + } + + err = InitTieredStorage(paramtable.Get()) + if err != nil { + return err + } + + err = InitInterminIndexConfig(paramtable.Get()) + if err != nil { + return err + } + + InitTraceConfig(paramtable.Get()) + C.InitExecExpressionFunctionFactory() + + // init paramtable change callback for core related config + SetupCoreConfigChangelCallback() + return InitPluginLoader() +}