From aa861f55e6f73ac5dba31390c5284152470e8b70 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 16 Sep 2025 10:10:00 +0800 Subject: [PATCH] enhance: [StorageV2] Reverts #44232 bucket name change (#44390) Related to #39173 - Put bucket name concatenation logic back for azure support This reverts commit 8f97eb355fde6b86cf37f166d2191750b4210ba3. Signed-off-by: Congqi Xia --- .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- internal/datanode/util/util.go | 5 +++++ .../querynodev2/segments/segment_loader.go | 14 ++++++++++++++ internal/storage/rw.go | 5 +++++ internal/storage/serde_events_v2.go | 18 ++++++++++++++++-- 5 files changed, 41 insertions(+), 3 deletions(-) diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 0d106c6616..d31e76df56 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION c53f68f ) +set( milvus-storage_VERSION c27fe8e ) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/datanode/util/util.go b/internal/datanode/util/util.go index fef2496553..3336576f29 100644 --- a/internal/datanode/util/util.go +++ b/internal/datanode/util/util.go @@ -1,6 +1,8 @@ package util import ( + "path" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -26,6 +28,9 @@ func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, storageConfig *in columnGroupID := insertLog.GetFieldID() for _, binlog := range insertLog.GetBinlogs() { filePath := metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID()) + if storageConfig.StorageType != "local" { + filePath = path.Join(storageConfig.GetBucketName(), filePath) + } filePaths = append(filePaths, filePath) } insertLogs = append(insertLogs, &indexcgopb.FieldInsertFiles{ diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index d871ef5ca4..dfcc5a9ffc 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -235,6 +235,17 @@ type segmentLoader struct { var _ Loader = (*segmentLoader)(nil) +func addBucketNameStorageV2(segmentInfo *querypb.SegmentLoadInfo) { + if segmentInfo.GetStorageVersion() == 2 && paramtable.Get().CommonCfg.StorageType.GetValue() != "local" { + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + for _, fieldBinlog := range segmentInfo.GetBinlogPaths() { + for _, binlog := range fieldBinlog.GetBinlogs() { + binlog.LogPath = path.Join(bucketName, binlog.LogPath) + } + } + } +} + func (loader *segmentLoader) Load(ctx context.Context, collectionID int64, segmentType SegmentType, @@ -250,6 +261,9 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("no segment to load") return nil, nil } + for _, segmentInfo := range segments { + addBucketNameStorageV2(segmentInfo) + } collection := loader.manager.Collection.Get(collectionID) if collection == nil { diff --git a/internal/storage/rw.go b/internal/storage/rw.go index 27e124652b..c80fd85e10 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -20,6 +20,7 @@ import ( "context" "fmt" sio "io" + "path" "sort" "github.com/samber/lo" @@ -276,10 +277,14 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { return fieldBinlog.GetBinlogs() }) + bucketName := rwOptions.storageConfig.BucketName paths := make([][]string, len(binlogLists[0])) for _, binlogs := range binlogLists { for j, binlog := range binlogs { logPath := binlog.GetLogPath() + if rwOptions.storageConfig.StorageType != "local" { + logPath = path.Join(bucketName, logPath) + } paths[j] = append(paths[j], logPath) } } diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 0117606c7a..537aa81521 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -19,6 +19,7 @@ package storage import ( "fmt" "io" + "path" "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -221,8 +223,20 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - - writer, err := packed.NewPackedWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) + // if storage config is not passed, use common config + storageType := paramtable.Get().CommonCfg.StorageType.GetValue() + if storageConfig != nil { + storageType = storageConfig.GetStorageType() + } + // compose true path before create packed writer here + // and returned writtenPaths shall remain untouched + truePaths := lo.Map(paths, func(p string, _ int) string { + if storageType == "local" { + return p + } + return path.Join(bucketName, p) + }) + writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error()))