From 6c5f5f1e32b7ddea69a15367910fb3eb43e5b0f3 Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Mon, 21 Jul 2025 19:46:53 +0800 Subject: [PATCH] enhance: [StorageV2] refactor group chunk translator (#43406) related: #43372 Signed-off-by: shaoting-huang --- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 14 ----- internal/core/src/segcore/memory_planner.cpp | 56 ++++++++----------- .../GroupChunkTranslator.cpp | 40 +++++++++---- .../GroupChunkTranslator.h | 2 - .../unittest/test_group_chunk_translator.cpp | 30 ++++------ 5 files changed, 64 insertions(+), 78 deletions(-) diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 681e6c92b4..6f028f7de8 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -270,19 +270,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList( column_group_id, insert_files[0], arrow_schema, fs); - std::vector row_group_meta_list; - for (const auto& file : insert_files) { - auto reader = - std::make_shared(fs, file); - row_group_meta_list.push_back( - reader->file_metadata()->GetRowGroupMetadataVector()); - auto status = reader->Close(); - AssertInfo(status.ok(), - "failed to close file reader when get row group " - "metadata from file: " + - file + " with error: " + status.ToString()); - } - // if multiple fields share same column group // hint for not loading certain field shall not be working for now // warmup will be disabled only when all columns are not in load list @@ -316,7 +303,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( column_group_info, insert_files, info.enable_mmap, - row_group_meta_list, milvus_field_ids.size(), load_info.load_priority); diff --git a/internal/core/src/segcore/memory_planner.cpp b/internal/core/src/segcore/memory_planner.cpp index 87a4ea2671..93ae341a13 100644 --- a/internal/core/src/segcore/memory_planner.cpp +++ b/internal/core/src/segcore/memory_planner.cpp @@ -104,54 +104,42 @@ ParallelDegreeSplitStrategy::split( return blocks; } - // If row group size is less than parallel degree, split non-continuous groups - if (sorted_row_groups.size() <= actual_parallel_degree) { + // Helper function to create continuous blocks + auto create_continuous_blocks = [&](size_t max_block_size = SIZE_MAX) { + std::vector continuous_blocks; int64_t current_start = sorted_row_groups[0]; int64_t current_count = 1; for (size_t i = 1; i < sorted_row_groups.size(); ++i) { int64_t next_row_group = sorted_row_groups[i]; - if (next_row_group == current_start + current_count) { + if (next_row_group == current_start + current_count && + current_count < max_block_size) { current_count++; continue; } - blocks.push_back({current_start, current_count}); + continuous_blocks.push_back({current_start, current_count}); current_start = next_row_group; current_count = 1; } if (current_count > 0) { - blocks.push_back({current_start, current_count}); + continuous_blocks.push_back({current_start, current_count}); } - return blocks; + return continuous_blocks; + }; + + // If row group size is less than parallel degree, split non-continuous groups + if (sorted_row_groups.size() <= actual_parallel_degree) { + return create_continuous_blocks(); } // Otherwise, split based on parallel degree size_t avg_block_size = (sorted_row_groups.size() + actual_parallel_degree - 1) / actual_parallel_degree; - int64_t current_start = sorted_row_groups[0]; - int64_t current_count = 1; - for (size_t i = 1; i < sorted_row_groups.size(); ++i) { - int64_t next_row_group = sorted_row_groups[i]; - - if (next_row_group == current_start + current_count && - current_count < avg_block_size) { - current_count++; - } else { - blocks.push_back({current_start, current_count}); - current_start = next_row_group; - current_count = 1; - } - } - - if (current_count > 0) { - blocks.push_back({current_start, current_count}); - } - - return blocks; + return create_continuous_blocks(avg_block_size); } void @@ -182,31 +170,33 @@ LoadWithStrategy(const std::vector& remote_files, // Use provided strategy to split row groups auto blocks = strategy->split(row_groups); + LOG_INFO("split row groups into blocks: {} for file {}", + blocks.size(), file); + // Create and submit tasks for each block std::vector>> futures; futures.reserve(blocks.size()); - // split memory limit for each block, check if it's greater than 0 - auto reader_memory_limit = memory_limit / blocks.size(); - if (reader_memory_limit < FILE_SLICE_SIZE) { - reader_memory_limit = FILE_SLICE_SIZE; - } + auto reader_memory_limit = std::max( + memory_limit / blocks.size(), FILE_SLICE_SIZE); for (const auto& block : blocks) { futures.emplace_back(pool.Submit([block, fs, file, schema, - memory_limit]() { + reader_memory_limit]() { AssertInfo(fs != nullptr, "file system is nullptr"); auto row_group_reader = std::make_shared( - fs, file, schema, memory_limit); + fs, file, schema, reader_memory_limit); AssertInfo(row_group_reader != nullptr, "row group reader is nullptr"); row_group_reader->SetRowGroupOffsetAndCount(block.offset, block.count); + LOG_INFO("read row groups from file {} with offset {} and count {}", + file, block.offset, block.count); auto ret = std::make_shared(); for (int64_t i = 0; i < block.count; ++i) { std::shared_ptr table; diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index c02fe03cec..146d8d63a5 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -21,6 +21,7 @@ #include "milvus-storage/common/metadata.h" #include "milvus-storage/filesystem/fs.h" #include "milvus-storage/common/constants.h" +#include "milvus-storage/format/parquet/file_reader.h" #include "storage/ThreadPools.h" #include "segcore/memory_planner.h" @@ -42,8 +43,6 @@ GroupChunkTranslator::GroupChunkTranslator( FieldDataInfo column_group_info, std::vector insert_files, bool use_mmap, - const std::vector& - row_group_meta_list, int64_t num_fields, milvus::proto::common::LoadPriority load_priority) : segment_id_(segment_id), @@ -52,7 +51,6 @@ GroupChunkTranslator::GroupChunkTranslator( column_group_info_(column_group_info), insert_files_(insert_files), use_mmap_(use_mmap), - row_group_meta_list_(row_group_meta_list), load_priority_(load_priority), meta_( num_fields, @@ -64,8 +62,22 @@ GroupChunkTranslator::GroupChunkTranslator( milvus::segcore::getCacheWarmupPolicy(/* is_vector */ false, /* is_index */ false), /* support_eviction */ true) { - AssertInfo(insert_files_.size() == row_group_meta_list_.size(), - "Number of insert files must match number of row group metas"); + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + + // Get row group metadata from files + for (const auto& file : insert_files_) { + auto reader = + std::make_shared(fs, file); + row_group_meta_list_.push_back( + reader->file_metadata()->GetRowGroupMetadataVector()); + auto status = reader->Close(); + AssertInfo(status.ok(), + "failed to close file reader when get row group " + "metadata from file: " + + file + " with error: " + status.ToString()); + } + meta_.num_rows_until_chunk_.push_back(0); for (const auto& row_group_meta : row_group_meta_list_) { for (int i = 0; i < row_group_meta.size(); ++i) { @@ -126,7 +138,17 @@ GroupChunkTranslator::get_file_and_row_group_index( remaining_cid -= file_metas.size(); } - return {0, 0}; // Default to first file and first row group if not found + // cid is out of range, this should not happen + AssertInfo(false, + fmt::format("cid {} is out of range. Total row groups across all files: {}", + cid, + [this]() { + size_t total = 0; + for (const auto& file_metas : row_group_meta_list_) { + total += file_metas.size(); + } + return total; + }())); } std::vector>> @@ -137,11 +159,7 @@ GroupChunkTranslator::get_cells(const std::vector& cids) { cells.reserve(cids.size()); // Create row group lists for requested cids - std::vector> row_group_lists; - row_group_lists.reserve(insert_files_.size()); - for (size_t i = 0; i < insert_files_.size(); ++i) { - row_group_lists.emplace_back(); - } + std::vector> row_group_lists(insert_files_.size()); for (auto cid : cids) { auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid); diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h index a2153b8cbd..274339e59d 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h @@ -41,8 +41,6 @@ class GroupChunkTranslator FieldDataInfo column_group_info, std::vector insert_files, bool use_mmap, - const std::vector& - row_group_meta_list, int64_t num_fields, milvus::proto::common::LoadPriority load_priority); diff --git a/internal/core/unittest/test_group_chunk_translator.cpp b/internal/core/unittest/test_group_chunk_translator.cpp index 3c70a9529d..0eec55832f 100644 --- a/internal/core/unittest/test_group_chunk_translator.cpp +++ b/internal/core/unittest/test_group_chunk_translator.cpp @@ -100,30 +100,22 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { auto use_mmap = GetParam(); std::unordered_map field_metas = schema_->get_fields(); auto column_group_info = FieldDataInfo(0, 3000, ""); - // Get row group metadata - std::vector row_group_meta_list; - auto fr = - std::make_shared(fs_, paths_[0]); - - row_group_meta_list.push_back( - fr->file_metadata()->GetRowGroupMetadataVector()); - auto status = fr->Close(); - AssertInfo( - status.ok(), - "failed to close file reader when get row group metadata from file: " + - paths_[0] + " with error: " + status.ToString()); auto translator = std::make_unique(segment_id_, field_metas, column_group_info, paths_, use_mmap, - row_group_meta_list, schema_->get_field_ids().size(), milvus::proto::common::LoadPriority::LOW); - // num cells - EXPECT_EQ(translator->num_cells(), row_group_meta_list[0].size()); + // num cells - get the expected number from the file directly + auto fr = std::make_shared(fs_, paths_[0]); + auto expected_num_cells = fr->file_metadata()->GetRowGroupMetadataVector().size(); + auto row_group_metadata_vector = fr->file_metadata()->GetRowGroupMetadataVector(); + auto status = fr->Close(); + AssertInfo(status.ok(), "failed to close file reader"); + EXPECT_EQ(translator->num_cells(), expected_num_cells); // cell id of for (size_t i = 0; i < translator->num_cells(); ++i) { @@ -133,14 +125,16 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { // key EXPECT_EQ(translator->key(), "seg_0_cg_0"); + + // estimated byte size for (size_t i = 0; i < translator->num_cells(); ++i) { auto [file_idx, row_group_idx] = translator->get_file_and_row_group_index(i); - auto& row_group_meta = row_group_meta_list[file_idx].Get(row_group_idx); + // Get the expected memory size from the file directly + auto expected_memory_size = static_cast(row_group_metadata_vector.Get(row_group_idx).memory_size()); auto usage = translator->estimated_byte_size_of_cell(i); - EXPECT_EQ(usage.memory_bytes, - static_cast(row_group_meta.memory_size())); + EXPECT_EQ(usage.memory_bytes, expected_memory_size); } // getting cells