From 8f97eb355fde6b86cf37f166d2191750b4210ba3 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 8 Sep 2025 10:15:55 +0800 Subject: [PATCH] enhance: [StorageV2] Make bucket name concatenation transparent to user (#44232) Related to #39173 This PR: - Bump milvus-storage commit to handle bucket name concatenation logic in multipart s3 fs - Remove all user-side bucket name concatenation code Signed-off-by: Congqi Xia --- .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- internal/datanode/util/util.go | 5 ----- .../querynodev2/segments/segment_loader.go | 14 -------------- internal/storage/rw.go | 5 ----- internal/storage/serde_events_v2.go | 18 ++---------------- 5 files changed, 3 insertions(+), 41 deletions(-) diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index f6809bba7a..39096990b8 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION 15b7fc7 ) +set( milvus-storage_VERSION 7f8121b ) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/datanode/util/util.go b/internal/datanode/util/util.go index 3336576f29..fef2496553 100644 --- a/internal/datanode/util/util.go +++ b/internal/datanode/util/util.go @@ -1,8 +1,6 @@ package util import ( - "path" - "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -28,9 +26,6 @@ func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, storageConfig *in columnGroupID := insertLog.GetFieldID() for _, binlog := range insertLog.GetBinlogs() { filePath := metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID()) - if storageConfig.StorageType != "local" { - filePath = path.Join(storageConfig.GetBucketName(), filePath) - } filePaths = append(filePaths, filePath) } insertLogs = append(insertLogs, &indexcgopb.FieldInsertFiles{ diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 985e152181..15da65d24d 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -235,17 +235,6 @@ type segmentLoader struct { var _ Loader = (*segmentLoader)(nil) -func addBucketNameStorageV2(segmentInfo *querypb.SegmentLoadInfo) { - if segmentInfo.GetStorageVersion() == 2 && paramtable.Get().CommonCfg.StorageType.GetValue() != "local" { - bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() - for _, fieldBinlog := range segmentInfo.GetBinlogPaths() { - for _, binlog := range fieldBinlog.GetBinlogs() { - binlog.LogPath = path.Join(bucketName, binlog.LogPath) - } - } - } -} - func (loader *segmentLoader) Load(ctx context.Context, collectionID int64, segmentType SegmentType, @@ -261,9 +250,6 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("no segment to load") return nil, nil } - for _, segmentInfo := range segments { - addBucketNameStorageV2(segmentInfo) - } collection := loader.manager.Collection.Get(collectionID) if collection == nil { diff --git a/internal/storage/rw.go b/internal/storage/rw.go index c80fd85e10..27e124652b 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -20,7 +20,6 @@ import ( "context" "fmt" sio "io" - "path" "sort" "github.com/samber/lo" @@ -277,14 +276,10 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { return fieldBinlog.GetBinlogs() }) - bucketName := rwOptions.storageConfig.BucketName paths := make([][]string, len(binlogLists[0])) for _, binlogs := range binlogLists { for j, binlog := range binlogs { logPath := binlog.GetLogPath() - if rwOptions.storageConfig.StorageType != "local" { - logPath = path.Join(bucketName, logPath) - } paths[j] = append(paths[j], logPath) } } diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 5e92cbeb4c..e98a737afe 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -19,7 +19,6 @@ package storage import ( "fmt" "io" - "path" "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" @@ -38,7 +37,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -223,20 +221,8 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - // if storage config is not passed, use common config - storageType := paramtable.Get().CommonCfg.StorageType.GetValue() - if storageConfig != nil { - storageType = storageConfig.GetStorageType() - } - // compose true path before create packed writer here - // and returned writtenPaths shall remain untouched - truePaths := lo.Map(paths, func(p string, _ int) string { - if storageType == "local" { - return p - } - return path.Join(bucketName, p) - }) - writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) + + writer, err := packed.NewPackedWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error()))