fix: remove redundant initialization of storage v2 (#44597)

issue: #44596

- querynode already init the storage v2 and segcore, so streamingnode
should not do this again.
- It also fix the gcp object storage access denied.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-09-29 10:17:04 +08:00 committed by GitHub
parent f342f49b32
commit b6b59bd222
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 187 additions and 139 deletions

View File

@ -33,12 +33,10 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"path"
"plugin" "plugin"
"strings" "strings"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo" "github.com/samber/lo"
@ -60,7 +58,6 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/pathutil"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/segcore" "github.com/milvus-io/milvus/internal/util/segcore"
@ -74,7 +71,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/expr" "github.com/milvus-io/milvus/pkg/v2/util/expr"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/lifetime"
"github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
@ -243,129 +239,6 @@ func (node *QueryNode) RegisterSegcoreConfigWatcher() {
config.NewHandler("common.diskWriteRateLimiter.lowPriorityRatio", node.ReconfigDiskFileWriterParams)) config.NewHandler("common.diskWriteRateLimiter.lowPriorityRatio", node.ReconfigDiskFileWriterParams))
} }
// InitSegcore set init params of segCore, such as chunkRows, SIMD type...
func (node *QueryNode) InitSegcore() error {
cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf))
C.SegcoreInit(cGlogConf)
C.free(unsafe.Pointer(cGlogConf))
// update log level based on current setup
initcore.UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue())
// override segcore chunk size
cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64())
C.SegcoreSetChunkRows(cChunkRows)
cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32())
C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize)
cKnowhereFetchThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.GetAsUint32())
C.SegcoreSetKnowhereFetchThreadPoolNum(cKnowhereFetchThreadPoolSize)
// override segcore SIMD type
cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue())
C.SegcoreSetSimdType(cSimdType)
C.free(unsafe.Pointer(cSimdType))
enableKnowhereScoreConsistency := paramtable.Get().QueryNodeCfg.KnowhereScoreConsistency.GetAsBool()
if enableKnowhereScoreConsistency {
C.SegcoreEnableKnowhereScoreConsistency()
}
// override segcore index slice size
cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64())
C.SetIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
node.RegisterSegcoreConfigWatcher()
cCPUNum := C.int(hardware.GetCPUNum())
C.InitCpuNum(cCPUNum)
knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum()))
if knowhereBuildPoolSize < uint32(1) {
knowhereBuildPoolSize = uint32(1)
}
log.Ctx(node.ctx).Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize))
cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize)
C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize)
cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64())
C.SetDefaultExprEvalBatchSize(cExprBatchSize)
cDeleteDumpBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.DeleteDumpBatchSize.GetAsInt64())
C.SetDefaultDeleteDumpBatchSize(cDeleteDumpBatchSize)
cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool())
C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled)
cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool())
C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled)
cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32())
cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32())
C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize)
cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool())
C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck)
cExprResCacheEnabled := C.bool(paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool())
C.SetExprResCacheEnable(cExprResCacheEnabled)
cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64())
C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes)
localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, node.GetNodeID())
initcore.InitLocalChunkManager(localDataRootPath)
err := initcore.InitRemoteChunkManager(paramtable.Get())
if err != nil {
return err
}
err = initcore.InitDiskFileWriterConfig(paramtable.Get())
if err != nil {
return err
}
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
err = initcore.InitStorageV2FileSystem(paramtable.Get())
if err != nil {
return err
}
}
err = initcore.InitMmapManager(paramtable.Get(), node.GetNodeID())
if err != nil {
return err
}
err = initcore.InitTieredStorage(paramtable.Get())
if err != nil {
return err
}
err = initcore.InitInterminIndexConfig(paramtable.Get())
if err != nil {
return err
}
initcore.InitTraceConfig(paramtable.Get())
C.InitExecExpressionFunctionFactory()
// init paramtable change callback for core related config
initcore.SetupCoreConfigChangelCallback()
return initcore.InitPluginLoader()
}
func getIndexEngineVersion() (minimal, current int32) { func getIndexEngineVersion() (minimal, current int32) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion() cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
return int32(cMinimal), int32(cCurrent) return int32(cMinimal), int32(cCurrent)
@ -482,12 +355,13 @@ func (node *QueryNode) Init() error {
// init pipeline manager // init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators) node.pipelineManager = pipeline.NewManager(node.manager, node.dispClient, node.delegators)
err = node.InitSegcore() err = initcore.InitQueryNode(node.ctx)
if err != nil { if err != nil {
log.Error("QueryNode init segcore failed", zap.Error(err)) log.Error("QueryNode init segcore failed", zap.Error(err))
initError = err initError = err
return return
} }
node.RegisterSegcoreConfigWatcher()
log.Info("query node init successfully", log.Info("query node init successfully",
zap.Int64("queryNodeID", node.GetNodeID()), zap.Int64("queryNodeID", node.GetNodeID()),

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"fmt" "fmt"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -16,7 +17,6 @@ import (
_ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/kafka" _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/kafka"
_ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar" _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar"
_ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" _ "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
// Server is the streamingnode server. // Server is the streamingnode server.
@ -41,17 +41,11 @@ func (s *Server) init() {
// init all service. // init all service.
s.initService() s.initService()
log.Info("init query segcore...")
initcore.InitQueryNode(context.TODO())
log.Info("streamingnode server initialized") log.Info("streamingnode server initialized")
// init storage v2 file system.
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
if err := initcore.InitStorageV2FileSystem(paramtable.Get()); err != nil {
panic(fmt.Sprintf("unrecoverable error happens at init storage v2 file system, %+v", err))
}
}
// init paramtable change callback for core related config
initcore.SetupCoreConfigChangelCallback()
} }
// Stop stops the streamingnode server. // Stop stops the streamingnode server.

View File

@ -0,0 +1,180 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package initcore
/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include <stdint.h>
#include "common/init_c.h"
#include "segcore/segcore_init_c.h"
#include "storage/storage_c.h"
#include "segcore/arrow_fs_c.h"
#include "common/type_c.h"
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
#include "exec/expression/function/init_c.h"
*/
import "C"
import (
"context"
"path"
"sync"
"unsafe"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/pathutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
var initQueryNodeOnce sync.Once
// InitQueryNode initializes query node once.
func InitQueryNode(ctx context.Context) error {
var err error
initQueryNodeOnce.Do(func() {
err = doInitQueryNodeOnce(ctx)
})
return err
}
// doInitQueryNodeOnce initializes query node once.
func doInitQueryNodeOnce(ctx context.Context) error {
nodeID := paramtable.GetNodeID()
cGlogConf := C.CString(path.Join(paramtable.GetBaseTable().GetConfigDir(), paramtable.DefaultGlogConf))
C.SegcoreInit(cGlogConf)
C.free(unsafe.Pointer(cGlogConf))
// update log level based on current setup
UpdateLogLevel(paramtable.Get().LogCfg.Level.GetValue())
// override segcore chunk size
cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64())
C.SegcoreSetChunkRows(cChunkRows)
cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32())
C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize)
cKnowhereFetchThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.GetAsUint32())
C.SegcoreSetKnowhereFetchThreadPoolNum(cKnowhereFetchThreadPoolSize)
// override segcore SIMD type
cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue())
C.SegcoreSetSimdType(cSimdType)
C.free(unsafe.Pointer(cSimdType))
enableKnowhereScoreConsistency := paramtable.Get().QueryNodeCfg.KnowhereScoreConsistency.GetAsBool()
if enableKnowhereScoreConsistency {
C.SegcoreEnableKnowhereScoreConsistency()
}
// override segcore index slice size
cIndexSliceSize := C.int64_t(paramtable.Get().CommonCfg.IndexSliceSize.GetAsInt64())
C.SetIndexSliceSize(cIndexSliceSize)
// set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.HighPriorityThreadCoreCoefficient.GetAsFloat())
C.SetHighPriorityThreadCoreCoefficient(cHighPriorityThreadCoreCoefficient)
cMiddlePriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsFloat())
C.SetMiddlePriorityThreadCoreCoefficient(cMiddlePriorityThreadCoreCoefficient)
cLowPriorityThreadCoreCoefficient := C.float(paramtable.Get().CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat())
C.SetLowPriorityThreadCoreCoefficient(cLowPriorityThreadCoreCoefficient)
cCPUNum := C.int(hardware.GetCPUNum())
C.InitCpuNum(cCPUNum)
knowhereBuildPoolSize := uint32(float32(paramtable.Get().QueryNodeCfg.InterimIndexBuildParallelRate.GetAsFloat()) * float32(hardware.GetCPUNum()))
if knowhereBuildPoolSize < uint32(1) {
knowhereBuildPoolSize = uint32(1)
}
log.Ctx(ctx).Info("set up knowhere build pool size", zap.Uint32("pool_size", knowhereBuildPoolSize))
cKnowhereBuildPoolSize := C.uint32_t(knowhereBuildPoolSize)
C.SegcoreSetKnowhereBuildThreadPoolNum(cKnowhereBuildPoolSize)
cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64())
C.SetDefaultExprEvalBatchSize(cExprBatchSize)
cDeleteDumpBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.DeleteDumpBatchSize.GetAsInt64())
C.SetDefaultDeleteDumpBatchSize(cDeleteDumpBatchSize)
cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool())
C.SetDefaultOptimizeExprEnable(cOptimizeExprEnabled)
cGrowingJSONKeyStatsEnabled := C.bool(paramtable.Get().CommonCfg.EnabledGrowingSegmentJSONKeyStats.GetAsBool())
C.SetDefaultGrowingJSONKeyStatsEnable(cGrowingJSONKeyStatsEnabled)
cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32())
cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32())
C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize)
cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool())
C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck)
cExprResCacheEnabled := C.bool(paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool())
C.SetExprResCacheEnable(cExprResCacheEnabled)
cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64())
C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes)
localDataRootPath := pathutil.GetPath(pathutil.LocalChunkPath, nodeID)
InitLocalChunkManager(localDataRootPath)
err := InitRemoteChunkManager(paramtable.Get())
if err != nil {
return err
}
err = InitDiskFileWriterConfig(paramtable.Get())
if err != nil {
return err
}
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
err = InitStorageV2FileSystem(paramtable.Get())
if err != nil {
return err
}
}
err = InitMmapManager(paramtable.Get(), nodeID)
if err != nil {
return err
}
err = InitTieredStorage(paramtable.Get())
if err != nil {
return err
}
err = InitInterminIndexConfig(paramtable.Get())
if err != nil {
return err
}
InitTraceConfig(paramtable.Get())
C.InitExecExpressionFunctionFactory()
// init paramtable change callback for core related config
SetupCoreConfigChangelCallback()
return InitPluginLoader()
}