From 6a1ce9eee64473767ac98b165ada3885d0ca4ee4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 9 Jan 2026 14:51:25 +0800 Subject: [PATCH] enhance: bump milvus-storage and use subtree fs to remove bucket name concatenation (#46866) Related to #46617 Bump milvus-storage version from 839a8e5 to 2ab2904, which introduces subtree filesystem support. This allows removing manual bucket name concatenation logic across the codebase as the storage layer now handles path prefixing internally. Changes: - Remove bucket name prefix logic in datanode, querynode, and storage layers - Simplify FileManager::GetRemoteIndexFilePrefixV2() - Rename CColumnGroups API to CColumnSplits to align with upstream - Update DiskFileManagerTest paths for new directory structure - Add FFI packed reader/writer unit tests Co-authored-by: Wei Liu --------- Signed-off-by: Congqi Xia Co-authored-by: Wei Liu --- .../src/common/VectorArrayStorageV2Test.cpp | 2 + .../src/index/json_stats/JsonKeyStats.cpp | 6 +- .../core/src/segcore/ColumnGroupsCTest.cpp | 10 +- .../src/segcore/PackedReaderWriterTest.cpp | 6 +- internal/core/src/segcore/column_groups_c.cpp | 10 +- internal/core/src/segcore/column_groups_c.h | 13 +- internal/core/src/segcore/packed_writer_c.cpp | 8 +- internal/core/src/segcore/packed_writer_c.h | 4 +- .../core/src/storage/DiskFileManagerTest.cpp | 19 +- internal/core/src/storage/FileManager.h | 8 +- .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- .../datanode/importv2/copy_segment_utils.go | 38 +- .../importv2/copy_segment_utils_test.go | 3 + internal/datanode/util/util.go | 5 - .../querynodev2/segments/segment_loader.go | 14 - internal/storage/binlog_record_writer.go | 3 - internal/storage/record_writer.go | 19 +- internal/storage/rw.go | 5 - .../storagev2/packed/packed_reader_ffi.go | 3 - .../packed/packed_reader_ffi_test.go | 410 ++++++++++++++++++ internal/storagev2/packed/packed_writer.go | 8 +- .../packed/packed_writer_ffi_test.go | 116 +++++ 22 files changed, 588 insertions(+), 124 deletions(-) create mode 100644 internal/storagev2/packed/packed_reader_ffi_test.go create mode 100644 internal/storagev2/packed/packed_writer_ffi_test.go diff --git a/internal/core/src/common/VectorArrayStorageV2Test.cpp b/internal/core/src/common/VectorArrayStorageV2Test.cpp index e62c1a4a65..ee90b1ec9b 100644 --- a/internal/core/src/common/VectorArrayStorageV2Test.cpp +++ b/internal/core/src/common/VectorArrayStorageV2Test.cpp @@ -244,6 +244,8 @@ class TestVectorArrayStorageV2 : public testing::Test { auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); fs->DeleteDir("/tmp/test_vector_array_for_storage_v2"); + + milvus_storage::ArrowFileSystemSingleton::GetInstance().Release(); } protected: diff --git a/internal/core/src/index/json_stats/JsonKeyStats.cpp b/internal/core/src/index/json_stats/JsonKeyStats.cpp index 6d57633c67..8764457f97 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.cpp +++ b/internal/core/src/index/json_stats/JsonKeyStats.cpp @@ -757,7 +757,7 @@ JsonKeyStats::BuildWithFieldData(const std::vector& field_datas, // for storage v2, we need to add bucket name to remote prefix auto remote_prefix = - AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix()); + disk_file_manager_->GetRemoteJsonStatsShreddingPrefix(); LOG_INFO( "init parquet writer with shredding remote prefix: {} for segment {}", remote_prefix, @@ -905,7 +905,7 @@ JsonKeyStats::LoadShreddingMeta( } auto remote_prefix = - AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix()); + disk_file_manager_->GetRemoteJsonStatsShreddingPrefix(); // load common meta from parquet only if key_field_map_ is not already populated // (for backward compatibility with old data that doesn't have separate meta file) @@ -938,7 +938,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id, int64_t num_rows = 0; auto remote_prefix = - AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix()); + disk_file_manager_->GetRemoteJsonStatsShreddingPrefix(); std::vector files; for (const auto& file_id : file_ids) { diff --git a/internal/core/src/segcore/ColumnGroupsCTest.cpp b/internal/core/src/segcore/ColumnGroupsCTest.cpp index 7ac7f02869..10c6030850 100644 --- a/internal/core/src/segcore/ColumnGroupsCTest.cpp +++ b/internal/core/src/segcore/ColumnGroupsCTest.cpp @@ -19,8 +19,8 @@ #include #include "segcore/column_groups_c.h" -TEST(CColumnGroups, TestCColumnGroups) { - CColumnGroups cgs = NewCColumnGroups(); +TEST(CColumnSplits, TestCColumnSplits) { + CColumnSplits cgs = NewCColumnSplits(); int group1[] = {2, 4, 5}; int group2[] = {0, 1}; int group3[] = {3, 6, 7, 8}; @@ -29,10 +29,10 @@ TEST(CColumnGroups, TestCColumnGroups) { int group_sizes[] = {3, 2, 4}; for (int i = 0; i < 3; i++) { - AddCColumnGroup(cgs, test_groups[i], group_sizes[i]); + AddCColumnSplit(cgs, test_groups[i], group_sizes[i]); } - ASSERT_EQ(CColumnGroupsSize(cgs), 3); + ASSERT_EQ(CColumnSplitsSize(cgs), 3); auto vv = static_cast>*>(cgs); for (int i = 0; i < 3; i++) { @@ -42,5 +42,5 @@ TEST(CColumnGroups, TestCColumnGroups) { } } - FreeCColumnGroups(cgs); + FreeCColumnSplits(cgs); } \ No newline at end of file diff --git a/internal/core/src/segcore/PackedReaderWriterTest.cpp b/internal/core/src/segcore/PackedReaderWriterTest.cpp index 00c797ccce..df357032f7 100644 --- a/internal/core/src/segcore/PackedReaderWriterTest.cpp +++ b/internal/core/src/segcore/PackedReaderWriterTest.cpp @@ -60,9 +60,9 @@ TEST(CPackedTest, PackedWriterAndReader) { char* paths[] = {const_cast("/tmp/0")}; int64_t part_upload_size = 0; - CColumnGroups cgs = NewCColumnGroups(); + CColumnSplits cgs = NewCColumnSplits(); int group[] = {0}; - AddCColumnGroup(cgs, group, 1); + AddCColumnSplit(cgs, group, 1); auto c_status = InitLocalArrowFileSystemSingleton(path); EXPECT_EQ(c_status.error_code, 0); @@ -102,5 +102,5 @@ TEST(CPackedTest, PackedWriterAndReader) { c_status = CloseReader(c_packed_reader); EXPECT_EQ(c_status.error_code, 0); - FreeCColumnGroups(cgs); + FreeCColumnSplits(cgs); } diff --git a/internal/core/src/segcore/column_groups_c.cpp b/internal/core/src/segcore/column_groups_c.cpp index e9c51716f2..5e2b954780 100644 --- a/internal/core/src/segcore/column_groups_c.cpp +++ b/internal/core/src/segcore/column_groups_c.cpp @@ -23,8 +23,8 @@ using VecVecInt = std::vector>; extern "C" { -CColumnGroups -NewCColumnGroups() { +CColumnSplits +NewCColumnSplits() { SCOPE_CGO_CALL_METRIC(); auto vv = std::make_unique(); @@ -32,7 +32,7 @@ NewCColumnGroups() { } void -AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { +AddCColumnSplit(CColumnSplits cgs, int* group, int group_size) { SCOPE_CGO_CALL_METRIC(); if (!cgs || !group) @@ -44,7 +44,7 @@ AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { } int -CColumnGroupsSize(CColumnGroups cgs) { +CColumnSplitsSize(CColumnSplits cgs) { SCOPE_CGO_CALL_METRIC(); if (!cgs) @@ -55,7 +55,7 @@ CColumnGroupsSize(CColumnGroups cgs) { } void -FreeCColumnGroups(CColumnGroups cgs) { +FreeCColumnSplits(CColumnSplits cgs) { SCOPE_CGO_CALL_METRIC(); delete static_cast(cgs); diff --git a/internal/core/src/segcore/column_groups_c.h b/internal/core/src/segcore/column_groups_c.h index 571878b948..3678c046b2 100644 --- a/internal/core/src/segcore/column_groups_c.h +++ b/internal/core/src/segcore/column_groups_c.h @@ -20,19 +20,18 @@ extern "C" { #endif -typedef void* CColumnGroups; - -CColumnGroups -NewCColumnGroups(); +typedef void* CColumnSplits; +CColumnSplits +NewCColumnSplits(); void -AddCColumnGroup(CColumnGroups cgs, int* group, int group_size); +AddCColumnSplit(CColumnSplits cgs, int* group, int group_size); int -CColumnGroupsSize(CColumnGroups cgs); +CColumnSplitsSize(CColumnSplits cgs); void -FreeCColumnGroups(CColumnGroups cgs); +FreeCColumnSplits(CColumnSplits cgs); #ifdef __cplusplus } diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index f6d135849f..7d55df415e 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -41,7 +41,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema, char** paths, int64_t num_paths, int64_t part_upload_size, - CColumnGroups column_groups, + CColumnSplits column_splits, CStorageConfig c_storage_config, CPackedWriter* c_packed_writer, CPluginContext* c_plugin_context) { @@ -83,7 +83,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema, auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); auto columnGroups = - *static_cast>*>(column_groups); + *static_cast>*>(column_splits); parquet::WriterProperties::Builder builder; auto plugin_ptr = @@ -135,7 +135,7 @@ NewPackedWriter(struct ArrowSchema* schema, char** paths, int64_t num_paths, int64_t part_upload_size, - CColumnGroups column_groups, + CColumnSplits column_splits, CPackedWriter* c_packed_writer, CPluginContext* c_plugin_context) { SCOPE_CGO_CALL_METRIC(); @@ -157,7 +157,7 @@ NewPackedWriter(struct ArrowSchema* schema, auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); auto columnGroups = - *static_cast>*>(column_groups); + *static_cast>*>(column_splits); parquet::WriterProperties::Builder builder; auto plugin_ptr = diff --git a/internal/core/src/segcore/packed_writer_c.h b/internal/core/src/segcore/packed_writer_c.h index c0b5a7e9b9..f3ca137c7e 100644 --- a/internal/core/src/segcore/packed_writer_c.h +++ b/internal/core/src/segcore/packed_writer_c.h @@ -31,7 +31,7 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema, char** paths, int64_t num_paths, int64_t part_upload_size, - CColumnGroups column_groups, + CColumnSplits column_splits, CStorageConfig c_storage_config, CPackedWriter* c_packed_writer, CPluginContext* c_plugin_context); @@ -42,7 +42,7 @@ NewPackedWriter(struct ArrowSchema* schema, char** paths, int64_t num_paths, int64_t part_upload_size, - CColumnGroups column_groups, + CColumnSplits column_splits, CPackedWriter* c_packed_writer, CPluginContext* c_plugin_context); diff --git a/internal/core/src/storage/DiskFileManagerTest.cpp b/internal/core/src/storage/DiskFileManagerTest.cpp index fa94e02fa4..8619f19886 100644 --- a/internal/core/src/storage/DiskFileManagerTest.cpp +++ b/internal/core/src/storage/DiskFileManagerTest.cpp @@ -125,20 +125,21 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) { TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) { auto conf = milvus_storage::ArrowFileSystemConfig(); conf.storage_type = "local"; - conf.root_path = "/tmp"; - milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); + conf.root_path = "/tmp/diskann"; - auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() - .GetArrowFileSystem(); + auto result = milvus_storage::CreateArrowFileSystem(conf); + EXPECT_TRUE(result.ok()); + auto fs = result.ValueOrDie(); auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); std::string small_index_file_path = - "/tmp/diskann/index_files/1000/small_index_file"; + "/tmp/diskann/index_files/1000/1/2/3/small_index_file"; std::string large_index_file_path = - "/tmp/diskann/index_files/1000/large_index_file"; + "/tmp/diskann/index_files/1000/1/2/3/large_index_file"; auto exist = lcm->Exist(large_index_file_path); - std::string index_file_path = "/tmp/diskann/index_files/1000/index_file"; + std::string index_file_path = + "/tmp/diskann/index_files/1000/1/2/3/index_file"; boost::filesystem::path localPath(index_file_path); auto local_file_name = localPath.filename().string(); @@ -165,7 +166,7 @@ TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) { IndexMeta index_meta = {3, 100, 1000, 1, "index"}; auto diskAnnFileManager = std::make_shared( - storage::FileManagerContext(filed_data_meta, index_meta, cm_, fs_)); + storage::FileManagerContext(filed_data_meta, index_meta, cm_, fs)); auto os = diskAnnFileManager->OpenOutputStream(index_file_path); size_t write_offset = 0; @@ -207,7 +208,7 @@ TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) { EXPECT_EQ(read_small_index_size, small_index_size); EXPECT_EQ(is->Tell(), read_offset); std::string small_index_file_path_read = - "/tmp/diskann/index_files/1000/small_index_file_read"; + "/tmp/diskann/index_files/1000/1/2/3/small_index_file_read"; lcm->CreateFile(small_index_file_path_read); int fd_read = open(small_index_file_path_read.c_str(), O_WRONLY); ASSERT_NE(fd_read, -1); diff --git a/internal/core/src/storage/FileManager.h b/internal/core/src/storage/FileManager.h index 2dbf58776f..17b9c5bb09 100644 --- a/internal/core/src/storage/FileManager.h +++ b/internal/core/src/storage/FileManager.h @@ -207,13 +207,7 @@ class FileManagerImpl : public milvus::FileManager { virtual std::string GetRemoteIndexFilePrefixV2() const { - boost::filesystem::path bucket = rcm_->GetBucketName(); - std::string v1_prefix = GetRemoteIndexObjectPrefix(); - if (bucket.empty()) { - return v1_prefix; - } else { - return NormalizePath(bucket / v1_prefix); - } + return GetRemoteIndexObjectPrefixV2(); } virtual std::string diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 24e62d0cc2..9d57bdf16c 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION 839a8e5) +set( milvus-storage_VERSION 2ab2904) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/datanode/importv2/copy_segment_utils.go b/internal/datanode/importv2/copy_segment_utils.go index 2d8ae82871..66931fe0b5 100644 --- a/internal/datanode/importv2/copy_segment_utils.go +++ b/internal/datanode/importv2/copy_segment_utils.go @@ -24,6 +24,7 @@ import ( "strings" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/storage" @@ -195,20 +196,13 @@ func transformFieldBinlogs( var totalRows int64 for _, srcFieldBinlog := range srcFieldBinlogs { - dstFieldBinlog := &datapb.FieldBinlog{ - FieldID: srcFieldBinlog.GetFieldID(), - Binlogs: make([]*datapb.Binlog, 0, len(srcFieldBinlog.GetBinlogs())), - } + dstFieldBinlog := proto.Clone(srcFieldBinlog).(*datapb.FieldBinlog) + dstFieldBinlog.Binlogs = make([]*datapb.Binlog, 0, len(srcFieldBinlog.GetBinlogs())) for _, srcBinlog := range srcFieldBinlog.GetBinlogs() { if srcPath := srcBinlog.GetLogPath(); srcPath != "" { - dstBinlog := &datapb.Binlog{ - EntriesNum: srcBinlog.GetEntriesNum(), - TimestampFrom: srcBinlog.GetTimestampFrom(), - TimestampTo: srcBinlog.GetTimestampTo(), - LogPath: mappings[srcPath], - LogSize: srcBinlog.GetLogSize(), - } + dstBinlog := proto.Clone(srcBinlog).(*datapb.Binlog) + dstBinlog.LogPath = mappings[srcPath] dstFieldBinlog.Binlogs = append(dstFieldBinlog.Binlogs, dstBinlog) if countRows { @@ -524,14 +518,9 @@ func buildIndexInfoFromSource( } } - textIndexInfos[fieldID] = &datapb.TextIndexStats{ - FieldID: srcText.GetFieldID(), - Version: srcText.GetVersion(), - BuildID: srcText.GetBuildID(), - Files: targetFiles, - LogSize: srcText.GetLogSize(), - MemorySize: srcText.GetMemorySize(), - } + dstText := proto.Clone(srcText).(*datapb.TextIndexStats) + dstText.Files = targetFiles + textIndexInfos[fieldID] = dstText } // Process JSON Key indexes - transform file paths @@ -545,14 +534,9 @@ func buildIndexInfoFromSource( } } - jsonKeyIndexInfos[fieldID] = &datapb.JsonKeyStats{ - FieldID: srcJson.GetFieldID(), - Version: srcJson.GetVersion(), - BuildID: srcJson.GetBuildID(), - Files: targetFiles, - JsonKeyStatsDataFormat: srcJson.GetJsonKeyStatsDataFormat(), - MemorySize: srcJson.GetMemorySize(), - } + dstJson := proto.Clone(srcJson).(*datapb.JsonKeyStats) + dstJson.Files = targetFiles + jsonKeyIndexInfos[fieldID] = dstJson } return indexInfos, textIndexInfos, jsonKeyIndexInfos diff --git a/internal/datanode/importv2/copy_segment_utils_test.go b/internal/datanode/importv2/copy_segment_utils_test.go index a6f323f7f4..f54bc12360 100644 --- a/internal/datanode/importv2/copy_segment_utils_test.go +++ b/internal/datanode/importv2/copy_segment_utils_test.go @@ -204,6 +204,7 @@ func TestTransformFieldBinlogs(t *testing.T) { TimestampTo: 200, LogPath: "files/insert_log/111/222/333/100/log1.log", LogSize: 1024, + MemorySize: 1024, }, }, }, @@ -216,6 +217,7 @@ func TestTransformFieldBinlogs(t *testing.T) { TimestampTo: 250, LogPath: "files/insert_log/111/222/333/101/log2.log", LogSize: 2048, + MemorySize: 2048, }, }, }, @@ -233,6 +235,7 @@ func TestTransformFieldBinlogs(t *testing.T) { assert.Equal(t, int64(1000), result[0].Binlogs[0].EntriesNum) assert.Equal(t, "files/insert_log/444/555/666/100/log1.log", result[0].Binlogs[0].LogPath) assert.Equal(t, int64(1024), result[0].Binlogs[0].LogSize) + assert.Equal(t, int64(1024), result[0].Binlogs[0].MemorySize) // Verify second field binlog assert.Equal(t, int64(101), result[1].FieldID) diff --git a/internal/datanode/util/util.go b/internal/datanode/util/util.go index 3336576f29..fef2496553 100644 --- a/internal/datanode/util/util.go +++ b/internal/datanode/util/util.go @@ -1,8 +1,6 @@ package util import ( - "path" - "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -28,9 +26,6 @@ func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, storageConfig *in columnGroupID := insertLog.GetFieldID() for _, binlog := range insertLog.GetBinlogs() { filePath := metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID()) - if storageConfig.StorageType != "local" { - filePath = path.Join(storageConfig.GetBucketName(), filePath) - } filePaths = append(filePaths, filePath) } insertLogs = append(insertLogs, &indexcgopb.FieldInsertFiles{ diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index ad909948c5..2e4933e42c 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -244,17 +244,6 @@ type segmentLoader struct { var _ Loader = (*segmentLoader)(nil) -func addBucketNameStorageV2(segmentInfo *querypb.SegmentLoadInfo) { - if segmentInfo.GetStorageVersion() == 2 && paramtable.Get().CommonCfg.StorageType.GetValue() != "local" { - bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() - for _, fieldBinlog := range segmentInfo.GetBinlogPaths() { - for _, binlog := range fieldBinlog.GetBinlogs() { - binlog.LogPath = path.Join(bucketName, binlog.LogPath) - } - } - } -} - func (loader *segmentLoader) Load(ctx context.Context, collectionID int64, segmentType SegmentType, @@ -270,9 +259,6 @@ func (loader *segmentLoader) Load(ctx context.Context, log.Info("no segment to load") return nil, nil } - for _, segmentInfo := range segments { - addBucketNameStorageV2(segmentInfo) - } collection := loader.manager.Collection.Get(collectionID) if collection == nil { diff --git a/internal/storage/binlog_record_writer.go b/internal/storage/binlog_record_writer.go index 9dac879ce4..ff7c9828c1 100644 --- a/internal/storage/binlog_record_writer.go +++ b/internal/storage/binlog_record_writer.go @@ -394,9 +394,6 @@ func (pw *PackedManifestRecordWriter) initWriters(r Record) error { var err error k := metautil.JoinIDPath(pw.collectionID, pw.partitionID, pw.segmentID) basePath := path.Join(pw.storageConfig.GetRootPath(), common.SegmentInsertLogPath, k) - if pw.storageConfig.StorageType != "local" { - basePath = path.Join(pw.storageConfig.GetBucketName(), basePath) - } pw.writer, err = NewPackedRecordManifestWriter(pw.storageConfig.GetBucketName(), basePath, -1, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups, pw.storageConfig, pw.storagePluginContext) if err != nil { return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error())) diff --git a/internal/storage/record_writer.go b/internal/storage/record_writer.go index 19b88aed51..7039dbd19d 100644 --- a/internal/storage/record_writer.go +++ b/internal/storage/record_writer.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -124,8 +123,7 @@ func (pw *packedRecordWriter) Close() error { return err } for id, fpath := range pw.pathsMap { - truePath := path.Join(pw.bucketName, fpath) - size, err := packed.GetFileSize(truePath, pw.storageConfig) + size, err := packed.GetFileSize(fpath, pw.storageConfig) if err != nil { return err } @@ -156,20 +154,7 @@ func NewPackedRecordWriter( return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - // if storage config is not passed, use common config - storageType := paramtable.Get().CommonCfg.StorageType.GetValue() - if storageConfig != nil { - storageType = storageConfig.GetStorageType() - } - // compose true path before create packed writer here - // and returned writtenPaths shall remain untouched - truePaths := lo.Map(paths, func(p string, _ int) string { - if storageType == "local" { - return p - } - return path.Join(bucketName, p) - }) - writer, err := packed.NewPackedWriter(truePaths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) + writer, err := packed.NewPackedWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups, storageConfig, storagePluginContext) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) diff --git a/internal/storage/rw.go b/internal/storage/rw.go index da6ebf2d36..672ab1e8d6 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -21,7 +21,6 @@ import ( "encoding/base64" "fmt" sio "io" - "path" "sort" "github.com/samber/lo" @@ -299,14 +298,10 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { return fieldBinlog.GetBinlogs() }) - bucketName := rwOptions.storageConfig.BucketName paths := make([][]string, len(binlogLists[0])) for _, binlogs := range binlogLists { for j, binlog := range binlogs { logPath := binlog.GetLogPath() - if rwOptions.storageConfig.StorageType != "local" { - logPath = path.Join(bucketName, logPath) - } paths[j] = append(paths[j], logPath) } } diff --git a/internal/storagev2/packed/packed_reader_ffi.go b/internal/storagev2/packed/packed_reader_ffi.go index 2dcfd9fbf0..badf62516c 100644 --- a/internal/storagev2/packed/packed_reader_ffi.go +++ b/internal/storagev2/packed/packed_reader_ffi.go @@ -177,9 +177,6 @@ func (r *FFIPackedReader) Schema() *arrow.Schema { // Retain increases the reference count func (r *FFIPackedReader) Retain() { - // if r.recordReader != nil { - // r.recordReader.Retain() - // } } // Release decreases the reference count diff --git a/internal/storagev2/packed/packed_reader_ffi_test.go b/internal/storagev2/packed/packed_reader_ffi_test.go new file mode 100644 index 0000000000..3eee2bae0b --- /dev/null +++ b/internal/storagev2/packed/packed_reader_ffi_test.go @@ -0,0 +1,410 @@ +package packed + +import ( + "fmt" + "io" + "math" + "testing" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/array" + "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + + "github.com/milvus-io/milvus/internal/storagecommon" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +func TestPackedFFIReader(t *testing.T) { + paramtable.Init() + pt := paramtable.Get() + pt.Save(pt.CommonCfg.StorageType.Key, "local") + dir := t.TempDir() + t.Log("Case temp dir: ", dir) + pt.Save(pt.LocalStorageCfg.Path.Key, dir) + + t.Cleanup(func() { + pt.Reset(pt.CommonCfg.StorageType.Key) + pt.Reset(pt.LocalStorageCfg.Path.Key) + }) + + const ( + numRows = 1000 + dim = 128 + ) + + // Create schema: int64 primary key + 128-dim float vector + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "pk", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}), + }, + { + Name: "vector", + Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, // float32 = 4 bytes + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}), + }, + }, nil) + + basePath := "files/packed_reader_test/1" + version := int64(0) + + // Build record batch + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer b.Release() + + pkBuilder := b.Field(0).(*array.Int64Builder) + vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder) + + // Store expected values for verification + expectedPks := make([]int64, numRows) + expectedVectors := make([][]byte, numRows) + + for i := 0; i < numRows; i++ { + // Append primary key + expectedPks[i] = int64(i) + pkBuilder.Append(expectedPks[i]) + + // Generate random float vector and convert to bytes + vectorBytes := make([]byte, dim*4) + for j := 0; j < dim; j++ { + floatVal := rand.Float32() + bits := math.Float32bits(floatVal) + common.Endian.PutUint32(vectorBytes[j*4:], bits) + } + expectedVectors[i] = vectorBytes + vectorBuilder.Append(vectorBytes) + } + + rec := b.NewRecord() + defer rec.Release() + + require.Equal(t, int64(numRows), rec.NumRows()) + + // Define column groups + columnGroups := []storagecommon.ColumnGroup{ + {Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID}, + } + + // Create FFI packed writer and write data + pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil) + require.NoError(t, err) + + err = pw.WriteRecordBatch(rec) + require.NoError(t, err) + + manifest, err := pw.Close() + require.NoError(t, err) + require.NotEmpty(t, manifest) + + t.Logf("Successfully wrote %d rows with %d-dim float vectors, manifest: %s", numRows, dim, manifest) + + // Create storage config for reader + storageConfig := &indexpb.StorageConfig{ + RootPath: dir, + StorageType: "local", + } + + // Create FFI packed reader + neededColumns := []string{"pk", "vector"} + reader, err := NewFFIPackedReader(manifest, schema, neededColumns, 8192, storageConfig, nil) + require.NoError(t, err) + require.NotNil(t, reader) + + // Verify schema + assert.Equal(t, schema, reader.Schema()) + + // Read all records and verify data + totalRowsRead := int64(0) + for { + record, err := reader.ReadNext() + if err == io.EOF { + break + } + require.NoError(t, err) + require.NotNil(t, record) + + // Verify column count + assert.Equal(t, int64(2), record.NumCols()) + + // Verify pk column + pkCol := record.Column(0).(*array.Int64) + for i := 0; i < pkCol.Len(); i++ { + expectedIdx := int(totalRowsRead) + i + assert.Equal(t, expectedPks[expectedIdx], pkCol.Value(i), fmt.Sprintf("pk mismatch at row %d", expectedIdx)) + } + + // Verify vector column + vectorCol := record.Column(1).(*array.FixedSizeBinary) + for i := 0; i < vectorCol.Len(); i++ { + expectedIdx := int(totalRowsRead) + i + assert.Equal(t, expectedVectors[expectedIdx], vectorCol.Value(i), fmt.Sprintf("vector mismatch at row %d", expectedIdx)) + } + + totalRowsRead += record.NumRows() + } + + // Verify total rows read + assert.Equal(t, int64(numRows), totalRowsRead) + + t.Logf("Successfully read %d rows", totalRowsRead) + + // Close reader + err = reader.Close() + require.NoError(t, err) +} + +func TestPackedFFIReaderPartialColumns(t *testing.T) { + paramtable.Init() + pt := paramtable.Get() + pt.Save(pt.CommonCfg.StorageType.Key, "local") + dir := t.TempDir() + pt.Save(pt.LocalStorageCfg.Path.Key, dir) + + t.Cleanup(func() { + pt.Reset(pt.CommonCfg.StorageType.Key) + pt.Reset(pt.LocalStorageCfg.Path.Key) + }) + + const ( + numRows = 500 + dim = 64 + ) + + // Create schema with 3 columns + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "pk", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}), + }, + { + Name: "score", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}), + }, + { + Name: "vector", + Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"102"}), + }, + }, nil) + + basePath := "files/packed_reader_partial_test/1" + version := int64(0) + + // Build record batch + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer b.Release() + + pkBuilder := b.Field(0).(*array.Int64Builder) + scoreBuilder := b.Field(1).(*array.Float64Builder) + vectorBuilder := b.Field(2).(*array.FixedSizeBinaryBuilder) + + expectedPks := make([]int64, numRows) + expectedScores := make([]float64, numRows) + + for i := 0; i < numRows; i++ { + expectedPks[i] = int64(i) + pkBuilder.Append(expectedPks[i]) + + expectedScores[i] = rand.Float64() * 100 + scoreBuilder.Append(expectedScores[i]) + + vectorBytes := make([]byte, dim*4) + for j := 0; j < dim; j++ { + floatVal := rand.Float32() + bits := math.Float32bits(floatVal) + common.Endian.PutUint32(vectorBytes[j*4:], bits) + } + vectorBuilder.Append(vectorBytes) + } + + rec := b.NewRecord() + defer rec.Release() + + // Define column groups + columnGroups := []storagecommon.ColumnGroup{ + {Columns: []int{0, 1, 2}, GroupID: storagecommon.DefaultShortColumnGroupID}, + } + + // Write data + pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil) + require.NoError(t, err) + + err = pw.WriteRecordBatch(rec) + require.NoError(t, err) + + manifest, err := pw.Close() + require.NoError(t, err) + + // Create storage config + storageConfig := &indexpb.StorageConfig{ + RootPath: dir, + StorageType: "local", + } + + // Read only pk and score columns (skip vector) + neededColumns := []string{"pk", "score"} + partialSchema := arrow.NewSchema([]arrow.Field{ + schema.Field(0), + schema.Field(1), + }, nil) + + reader, err := NewFFIPackedReader(manifest, partialSchema, neededColumns, 8192, storageConfig, nil) + require.NoError(t, err) + require.NotNil(t, reader) + + totalRowsRead := int64(0) + for { + record, err := reader.ReadNext() + if err == io.EOF { + break + } + require.NoError(t, err) + + // Verify only 2 columns are returned + assert.Equal(t, int64(2), record.NumCols()) + + // Verify pk column + pkCol := record.Column(0).(*array.Int64) + for i := 0; i < pkCol.Len(); i++ { + expectedIdx := int(totalRowsRead) + i + assert.Equal(t, expectedPks[expectedIdx], pkCol.Value(i)) + } + + // Verify score column + scoreCol := record.Column(1).(*array.Float64) + for i := 0; i < scoreCol.Len(); i++ { + expectedIdx := int(totalRowsRead) + i + assert.Equal(t, expectedScores[expectedIdx], scoreCol.Value(i)) + } + + totalRowsRead += record.NumRows() + } + + assert.Equal(t, int64(numRows), totalRowsRead) + t.Logf("Successfully read %d rows with partial columns", totalRowsRead) + + err = reader.Close() + require.NoError(t, err) +} + +func TestPackedFFIReaderMultipleBatches(t *testing.T) { + paramtable.Init() + pt := paramtable.Get() + pt.Save(pt.CommonCfg.StorageType.Key, "local") + dir := t.TempDir() + pt.Save(pt.LocalStorageCfg.Path.Key, dir) + + t.Cleanup(func() { + pt.Reset(pt.CommonCfg.StorageType.Key) + pt.Reset(pt.LocalStorageCfg.Path.Key) + }) + + const ( + numRows = 500 + dim = 64 + numWrites = 3 + ) + + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "pk", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}), + }, + { + Name: "vector", + Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}), + }, + }, nil) + + basePath := "files/packed_reader_multi_batch_test/1" + version := int64(0) + + columnGroups := []storagecommon.ColumnGroup{ + {Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID}, + } + + var manifest string + totalWrittenRows := 0 + + // Write multiple batches + for batch := 0; batch < numWrites; batch++ { + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + + pkBuilder := b.Field(0).(*array.Int64Builder) + vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder) + + for i := 0; i < numRows; i++ { + pkBuilder.Append(int64(totalWrittenRows + i)) + + vectorBytes := make([]byte, dim*4) + for j := 0; j < dim; j++ { + floatVal := rand.Float32() + bits := math.Float32bits(floatVal) + common.Endian.PutUint32(vectorBytes[j*4:], bits) + } + vectorBuilder.Append(vectorBytes) + } + + rec := b.NewRecord() + + pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil) + require.NoError(t, err) + + err = pw.WriteRecordBatch(rec) + require.NoError(t, err) + + manifest, err = pw.Close() + require.NoError(t, err) + + _, version, err = UnmarshalManfestPath(manifest) + require.NoError(t, err) + + totalWrittenRows += numRows + + b.Release() + rec.Release() + } + + // Read all data + storageConfig := &indexpb.StorageConfig{ + RootPath: dir, + StorageType: "local", + } + + neededColumns := []string{"pk", "vector"} + reader, err := NewFFIPackedReader(manifest, schema, neededColumns, 8192, storageConfig, nil) + require.NoError(t, err) + + totalRowsRead := int64(0) + for { + record, err := reader.ReadNext() + if err == io.EOF { + break + } + require.NoError(t, err) + totalRowsRead += record.NumRows() + } + + assert.Equal(t, int64(totalWrittenRows), totalRowsRead) + t.Logf("Successfully read %d rows from %d batches", totalRowsRead, numWrites) + + err = reader.Close() + require.NoError(t, err) +} diff --git a/internal/storagev2/packed/packed_writer.go b/internal/storagev2/packed/packed_writer.go index 55b8fb2474..5e1c8b765e 100644 --- a/internal/storagev2/packed/packed_writer.go +++ b/internal/storagev2/packed/packed_writer.go @@ -55,7 +55,7 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, cMultiPartUploadSize := C.int64_t(multiPartUploadSize) - cColumnGroups := C.NewCColumnGroups() + cColumnSplits := C.NewCColumnSplits() for _, group := range columnGroups { cGroup := C.malloc(C.size_t(len(group.Columns)) * C.size_t(unsafe.Sizeof(C.int(0)))) if cGroup == nil { @@ -65,7 +65,7 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, for i, val := range group.Columns { cGroupSlice[i] = C.int(val) } - C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group.Columns))) + C.AddCColumnSplit(cColumnSplits, (*C.int)(cGroup), C.int(len(group.Columns))) C.free(cGroup) } @@ -117,9 +117,9 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, defer C.free(unsafe.Pointer(cStorageConfig.sslCACert)) defer C.free(unsafe.Pointer(cStorageConfig.region)) defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json)) - status = C.NewPackedWriterWithStorageConfig(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, cStorageConfig, &cPackedWriter, pluginContextPtr) + status = C.NewPackedWriterWithStorageConfig(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnSplits, cStorageConfig, &cPackedWriter, pluginContextPtr) } else { - status = C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnGroups, &cPackedWriter, pluginContextPtr) + status = C.NewPackedWriter(cSchema, cBufferSize, cFilePathsArray, cNumPaths, cMultiPartUploadSize, cColumnSplits, &cPackedWriter, pluginContextPtr) } if err := ConsumeCStatusIntoError(&status); err != nil { return nil, err diff --git a/internal/storagev2/packed/packed_writer_ffi_test.go b/internal/storagev2/packed/packed_writer_ffi_test.go new file mode 100644 index 0000000000..16241721fa --- /dev/null +++ b/internal/storagev2/packed/packed_writer_ffi_test.go @@ -0,0 +1,116 @@ +package packed + +import ( + "math" + "testing" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/array" + "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + + "github.com/milvus-io/milvus/internal/storagecommon" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +func TestPackedFFIWriter(t *testing.T) { + paramtable.Init() + pt := paramtable.Get() + pt.Save(pt.CommonCfg.StorageType.Key, "local") + dir := t.TempDir() + t.Log("Case temp dir: ", dir) + pt.Save(pt.LocalStorageCfg.Path.Key, dir) + + t.Cleanup(func() { + pt.Reset(pt.CommonCfg.StorageType.Key) + pt.Reset(pt.LocalStorageCfg.Path.Key) + }) + + const ( + numRows = 5000 + dim = 768 + batch = 10 + ) + + // Create schema: int64 primary key + 768-dim float vector + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "pk", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"}), + }, + { + Name: "vector", + Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4}, // float32 = 4 bytes + Nullable: false, + Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"}), + }, + }, nil) + + basePath := "files/packed_writer_test/1" + version := int64(0) + + for i := 0; i < batch; i++ { + // Build record batch with 5000 rows + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + defer b.Release() + + pkBuilder := b.Field(0).(*array.Int64Builder) + vectorBuilder := b.Field(1).(*array.FixedSizeBinaryBuilder) + + for i := 0; i < numRows; i++ { + // Append primary key + pkBuilder.Append(int64(i)) + + // Generate random float vector and convert to bytes + vectorBytes := make([]byte, dim*4) + for j := 0; j < dim; j++ { + floatVal := rand.Float32() + bits := math.Float32bits(floatVal) + common.Endian.PutUint32(vectorBytes[j*4:], bits) + } + vectorBuilder.Append(vectorBytes) + } + + rec := b.NewRecord() + defer rec.Release() + + require.Equal(t, int64(numRows), rec.NumRows()) + + // // Setup storage config for local filesystem + // storageConfig := &indexpb.StorageConfig{ + // RootPath: dir, + // StorageType: "local", + // } + + // Define column groups: pk and vector in the same group + columnGroups := []storagecommon.ColumnGroup{ + {Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID}, + } + + // Create FFI packed writer + pw, err := NewFFIPackedWriter(basePath, version, schema, columnGroups, nil, nil) + require.NoError(t, err) + + // Write record batch + err = pw.WriteRecordBatch(rec) + require.NoError(t, err) + + // Close writer and get manifest + manifest, err := pw.Close() + require.NoError(t, err) + require.NotEmpty(t, manifest) + + p, pv, err := UnmarshalManfestPath(manifest) + require.NoError(t, err) + assert.Equal(t, p, basePath) + assert.Equal(t, pv, version+1) + version = pv + + t.Logf("Successfully wrote %d rows with %d-dim float vectors, manifest: %s", numRows, dim, manifest) + } +}