From 0d57acb13ad842ec6ef235a9c0cc32ee81bc6b04 Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Wed, 25 Jun 2025 11:08:48 +0800 Subject: [PATCH] enhance: [StorageV2] field id as meta path for wide column when load (#42863) related: #42862 #39173 Signed-off-by: shaoting-huang --- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 49 +++++++++++++------ .../src/segcore/ChunkedSegmentSealedImpl.h | 6 +++ .../core/src/segcore/SegmentGrowingImpl.cpp | 32 ++++++------ .../GroupChunkTranslator.cpp | 23 ++++++--- .../GroupChunkTranslator.h | 3 +- internal/core/src/storage/Util.cpp | 18 ++++--- internal/core/src/storage/Util.h | 3 ++ .../test_chunked_segment_storage_v2.cpp | 46 ++++++++++++++--- .../unittest/test_group_chunk_translator.cpp | 4 +- internal/querynodev2/segments/segment.go | 7 +-- 10 files changed, 131 insertions(+), 60 deletions(-) diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 2e2bbe6553..7cb5da86a1 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "Utils.h" @@ -249,6 +250,8 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( size_t num_rows = storage::GetNumRowsForLoadInfo(load_info); ArrowSchemaPtr arrow_schema = schema_->ConvertToArrowSchema(); + auto field_ids = schema_->get_field_ids(); + for (auto& [id, info] : load_info.field_infos) { AssertInfo(info.row_count > 0, "The row count of field data is 0"); @@ -257,12 +260,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( storage::SortByPath(insert_files); auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto file_reader = std::make_shared( - fs, insert_files[0], arrow_schema); - std::shared_ptr metadata = - file_reader->file_metadata(); - - auto field_id_mapping = metadata->GetFieldIDMapping(); std::vector row_group_meta_list; for (const auto& file : insert_files) { @@ -272,20 +269,20 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( reader->file_metadata()->GetRowGroupMetadataVector()); } - milvus_storage::FieldIDList field_id_list = - metadata->GetGroupFieldIDList().GetFieldIDList( - column_group_id.get()); std::vector milvus_field_ids; + if (column_group_id == FieldId(DEFAULT_SHORT_COLUMN_GROUP_ID)) { + milvus_field_ids = narrow_column_field_ids_; + } else { + milvus_field_ids.push_back(column_group_id); + } // if multiple fields share same column group // hint for not loading certain field shall not be working for now // warmup will be disabled only when all columns are not in load list bool merged_in_load_list = false; - for (int i = 0; i < field_id_list.size(); ++i) { - milvus_field_ids.emplace_back(field_id_list.Get(i)); - merged_in_load_list = - merged_in_load_list || - schema_->ShallLoadField(FieldId(field_id_list.Get(i))); + for (int i = 0; i < milvus_field_ids.size(); ++i) { + merged_in_load_list = merged_in_load_list || + schema_->ShallLoadField(milvus_field_ids[i]); } auto column_group_info = FieldDataInfo(column_group_id.get(), @@ -307,7 +304,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( insert_files, info.enable_mmap, row_group_meta_list, - field_id_list, + schema_->size(), load_info.load_priority); auto chunked_column_group = @@ -463,6 +460,9 @@ ChunkedSegmentSealedImpl::AddFieldDataInfoForSealed( const LoadFieldDataInfo& field_data_info) { // copy assignment field_data_info_ = field_data_info; + if (field_data_info_.storage_version == 2) { + init_narrow_column_field_ids(field_data_info); + } } // internal API: support scalar index only @@ -2014,4 +2014,23 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) { set_bit(field_data_ready_bitset_, field_id, true); } +void +ChunkedSegmentSealedImpl::init_narrow_column_field_ids( + const LoadFieldDataInfo& field_data_info) { + std::unordered_set column_group_ids; + for (auto& [id, info] : field_data_info.field_infos) { + int64_t group_id = + storage::ExtractGroupIdFromPath(info.insert_files[0]); + column_group_ids.insert(group_id); + } + + std::vector narrow_column_field_ids; + for (auto& field_id : schema_->get_field_ids()) { + if (column_group_ids.find(field_id.get()) == column_group_ids.end()) { + narrow_column_field_ids.push_back(field_id); + } + } + narrow_column_field_ids_ = narrow_column_field_ids; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 05cefba8db..5cf3933322 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -405,6 +405,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { bool enable_mmap, bool is_proxy_column); + void + init_narrow_column_field_ids(const LoadFieldDataInfo& field_data_info); + private: // InsertRecord needs to pin pk column. friend class storagev1translator::InsertRecordTranslator; @@ -437,6 +440,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { LoadFieldDataInfo field_data_info_; + // for storage v2 + std::vector narrow_column_field_ids_; + SchemaPtr schema_; int64_t id_; mutable std::unordered_map> diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 8914d4bbd1..2f0d9d2f3a 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -46,6 +46,7 @@ #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" +#include "milvus-storage/common/constants.h" namespace milvus::segcore { @@ -425,16 +426,6 @@ SegmentGrowingImpl::load_column_group_data_internal( storage::SortByPath(insert_files); auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto file_reader = std::make_shared( - fs, insert_files[0], arrow_schema); - std::shared_ptr metadata = - file_reader->file_metadata(); - - auto field_id_mapping = metadata->GetFieldIDMapping(); - - milvus_storage::FieldIDList field_ids = - metadata->GetGroupFieldIDList().GetFieldIDList( - column_group_id.get()); auto column_group_info = FieldDataInfo(column_group_id.get(), num_rows, infos.mmap_dir_path); @@ -475,9 +466,9 @@ SegmentGrowingImpl::load_column_group_data_internal( infos.load_priority); }); - LOG_INFO("segment {} submits load fields {} task to thread pool", + LOG_INFO("segment {} submits load column group {} task to thread pool", this->get_segment_id(), - field_ids.ToString()); + info.field_id); std::shared_ptr r; @@ -485,10 +476,19 @@ SegmentGrowingImpl::load_column_group_data_internal( while (column_group_info.arrow_reader_channel->pop(r)) { for (const auto& table : r->arrow_tables) { size_t batch_num_rows = table->num_rows(); - for (int i = 0; i < field_ids.size(); ++i) { - auto field_id = FieldId(field_ids.Get(i)); + for (int i = 0; i < table->schema()->num_fields(); ++i) { + AssertInfo(table->schema()->field(i)->metadata()->Contains( + milvus_storage::ARROW_FIELD_ID_KEY), + "field id not found in metadata for field {}", + table->schema()->field(i)->name()); + auto field_id = + std::stoll(table->schema() + ->field(i) + ->metadata() + ->Get(milvus_storage::ARROW_FIELD_ID_KEY) + ->data()); for (auto& field : schema_->get_fields()) { - if (field.second.get_id().get() != field_id.get()) { + if (field.second.get_id().get() != field_id) { continue; } auto field_data = storage::CreateFieldData( @@ -498,7 +498,7 @@ SegmentGrowingImpl::load_column_group_data_internal( : 0, batch_num_rows); field_data->FillFieldData(table->column(i)); - field_data_map[field_id].push_back(field_data); + field_data_map[FieldId(field_id)].push_back(field_data); } } } diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index 35775d407e..4e5615577d 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -20,6 +20,7 @@ #include "common/Types.h" #include "milvus-storage/common/metadata.h" #include "milvus-storage/filesystem/fs.h" +#include "milvus-storage/common/constants.h" #include "storage/ThreadPools.h" #include "segcore/memory_planner.h" @@ -43,7 +44,7 @@ GroupChunkTranslator::GroupChunkTranslator( bool use_mmap, const std::vector& row_group_meta_list, - milvus_storage::FieldIDList field_id_list, + int64_t num_fields, milvus::proto::common::LoadPriority load_priority) : segment_id_(segment_id), key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_info.field_id)), @@ -52,10 +53,9 @@ GroupChunkTranslator::GroupChunkTranslator( insert_files_(insert_files), use_mmap_(use_mmap), row_group_meta_list_(row_group_meta_list), - field_id_list_(field_id_list), load_priority_(load_priority), meta_( - field_id_list.size(), + num_fields, use_mmap ? milvus::cachinglayer::StorageType::DISK : milvus::cachinglayer::StorageType::MEMORY, // TODO(tiered storage 2): vector may be of small size and mixed with scalar, do we force it @@ -163,9 +163,9 @@ GroupChunkTranslator::get_cells(const std::vector& cids) { nullptr, load_priority_); }); - LOG_INFO("segment {} submits load fields {} task to thread pool", + LOG_INFO("segment {} submits load column group {} task to thread pool", segment_id_, - field_id_list_.ToString()); + column_group_info_.field_id); std::shared_ptr r; int64_t cid_idx = 0; @@ -194,8 +194,17 @@ GroupChunkTranslator::load_group_chunk( // Create chunks for each field in this batch std::unordered_map> chunks; // Iterate through field_id_list to get field_id and create chunk - for (size_t i = 0; i < field_id_list_.size(); ++i) { - auto field_id = field_id_list_.Get(i); + for (int i = 0; i < table->schema()->num_fields(); ++i) { + AssertInfo(table->schema()->field(i)->metadata()->Contains( + milvus_storage::ARROW_FIELD_ID_KEY), + "field id not found in metadata for field {}", + table->schema()->field(i)->name()); + auto field_id = std::stoll(table->schema() + ->field(i) + ->metadata() + ->Get(milvus_storage::ARROW_FIELD_ID_KEY) + ->data()); + auto fid = milvus::FieldId(field_id); if (fid == RowFieldID) { // ignore row id field diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h index 8e7c036f77..a2153b8cbd 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h @@ -43,7 +43,7 @@ class GroupChunkTranslator bool use_mmap, const std::vector& row_group_meta_list, - milvus_storage::FieldIDList field_id_list, + int64_t num_fields, milvus::proto::common::LoadPriority load_priority); ~GroupChunkTranslator() override; @@ -83,7 +83,6 @@ class GroupChunkTranslator FieldDataInfo column_group_info_; std::vector insert_files_; std::vector row_group_meta_list_; - milvus_storage::FieldIDList field_id_list_; SchemaPtr schema_; bool is_sorted_by_pk_; ChunkedSegmentSealedImpl* chunked_segment_; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 4451937225..d72dfa6756 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1060,14 +1060,7 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, std::unordered_map> column_group_files; for (auto& remote_chunk_files : remote_files) { AssertInfo(remote_chunk_files.size() > 0, "remote files size is 0"); - - // find second last of / to get group_id - std::string path = remote_chunk_files[0]; - size_t last_slash = path.find_last_of("/"); - size_t second_last_slash = path.find_last_of("/", last_slash - 1); - int64_t group_id = std::stol(path.substr( - second_last_slash + 1, last_slash - second_last_slash - 1)); - + int64_t group_id = ExtractGroupIdFromPath(remote_chunk_files[0]); column_group_files[group_id] = remote_chunk_files; } @@ -1184,4 +1177,13 @@ CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager, return field_datas; } +int64_t +ExtractGroupIdFromPath(const std::string& path) { + // find second last of / to get group_id + size_t last_slash = path.find_last_of("/"); + size_t second_last_slash = path.find_last_of("/", last_slash - 1); + return std::stol( + path.substr(second_last_slash + 1, last_slash - second_last_slash - 1)); +} + } // namespace milvus::storage diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index 10163237ec..a779e71a83 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -202,6 +202,9 @@ CollectFieldDataChannel(FieldDataChannelPtr& channel); FieldDataPtr MergeFieldData(std::vector& data_array); +int64_t +ExtractGroupIdFromPath(const std::string& path); + template struct has_native_type : std::false_type {}; template diff --git a/internal/core/unittest/test_chunked_segment_storage_v2.cpp b/internal/core/unittest/test_chunked_segment_storage_v2.cpp index 752d9bcb8b..8c72a2beea 100644 --- a/internal/core/unittest/test_chunked_segment_storage_v2.cpp +++ b/internal/core/unittest/test_chunked_segment_storage_v2.cpp @@ -74,15 +74,26 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { // Initialize file system auto conf = milvus_storage::ArrowFileSystemConfig(); conf.storage_type = "local"; - conf.root_path = "/tmp"; + conf.root_path = "test_data"; milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); // Prepare paths and column groups - std::vector paths = {"/tmp/0.parquet", "/tmp/1.parquet"}; + std::vector paths = {"test_data/0/10000.parquet", + "test_data/102/10001.parquet", + "test_data/103/10002.parquet"}; + + // Create directories for the parquet files + for (const auto& path : paths) { + auto dir_path = path.substr(0, path.find_last_of('/')); + auto status = fs->CreateDir(dir_path); + EXPECT_TRUE(status.ok()) + << "Failed to create directory: " << dir_path; + } + std::vector> column_groups = { - {0, 1, 2}, {3, 4}}; // short columns and long columns + {0, 4, 3}, {2}, {1}}; // narrow columns and wide columns auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); @@ -162,15 +173,38 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { false, std::vector({paths[0]})}); load_info.field_infos.emplace( - int64_t(1), - FieldBinlogInfo{int64_t(1), + int64_t(102), + FieldBinlogInfo{int64_t(102), static_cast(row_count), std::vector(chunk_num * test_data_count), false, std::vector({paths[1]})}); + load_info.field_infos.emplace( + int64_t(103), + FieldBinlogInfo{int64_t(103), + static_cast(row_count), + std::vector(chunk_num * test_data_count), + false, + std::vector({paths[2]})}); load_info.mmap_dir_path = ""; load_info.storage_version = 2; - segment->LoadFieldData(load_info); + segment->AddFieldDataInfoForSealed(load_info); + for (auto& [id, info] : load_info.field_infos) { + LoadFieldDataInfo load_field_info; + load_field_info.storage_version = 2; + load_field_info.mmap_dir_path = ""; + load_field_info.field_infos.emplace(id, info); + segment->LoadFieldData(load_field_info); + } + } + + void + TearDown() override { + // Clean up test data directory + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + auto status = fs->DeleteDir("test_data"); + ASSERT_TRUE(status.ok()); } segcore::SegmentSealedUPtr segment; diff --git a/internal/core/unittest/test_group_chunk_translator.cpp b/internal/core/unittest/test_group_chunk_translator.cpp index 8e839757b1..c65f45c9b4 100644 --- a/internal/core/unittest/test_group_chunk_translator.cpp +++ b/internal/core/unittest/test_group_chunk_translator.cpp @@ -103,8 +103,6 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { std::vector row_group_meta_list; auto fr = std::make_shared(fs_, paths_[0]); - auto field_id_list = - fr->file_metadata()->GetGroupFieldIDList().GetFieldIDList(0); row_group_meta_list.push_back( fr->file_metadata()->GetRowGroupMetadataVector()); @@ -115,7 +113,7 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { paths_, use_mmap, row_group_meta_list, - field_id_list, + schema_->get_field_ids().size(), milvus::proto::common::LoadPriority::LOW); // num cells diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 7dd52674ce..2e846e5665 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -843,9 +843,10 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie ) req := &segcore.AddFieldDataInfoRequest{ - Fields: make([]segcore.LoadFieldDataInfo, 0, len(fields)), - RowCount: rowCount, - LoadPriority: s.loadInfo.Load().GetPriority(), + Fields: make([]segcore.LoadFieldDataInfo, 0, len(fields)), + RowCount: rowCount, + LoadPriority: s.loadInfo.Load().GetPriority(), + StorageVersion: s.loadInfo.Load().GetStorageVersion(), } for _, field := range fields { req.Fields = append(req.Fields, segcore.LoadFieldDataInfo{