diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 82fe47e7bf..5a4b9a22eb 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -2862,6 +2862,7 @@ ChunkedSegmentSealedImpl::LoadColumnGroup( auto translator = std::make_unique( get_segment_id(), + GroupChunkType::DEFAULT, index, std::move(chunk_reader), field_metas, diff --git a/internal/core/src/segcore/storagev2translator/GroupCTMeta.h b/internal/core/src/segcore/storagev2translator/GroupCTMeta.h index a49c7151a4..b941a10b95 100644 --- a/internal/core/src/segcore/storagev2translator/GroupCTMeta.h +++ b/internal/core/src/segcore/storagev2translator/GroupCTMeta.h @@ -19,10 +19,23 @@ namespace milvus::segcore::storagev2translator { +// Number of row groups (parquet row groups) merged into one cache cell, +// for now it is a constant. +// hierarchy: 1 group chunk <-> 1 cache cell <-> kRowGroupsPerCell row groups +constexpr size_t kRowGroupsPerCell = 4; +static_assert(kRowGroupsPerCell > 0, + "kRowGroupsPerCell must be greater than 0"); + struct GroupCTMeta : public milvus::cachinglayer::Meta { + // num_rows_until_chunk_[i] = total rows(prefix sum) in cells [0, i-1] + // the size of num_rows_until_chunk_ is num_cells + 1 std::vector num_rows_until_chunk_; + // memory size for each group chunk(cache cell) std::vector chunk_memory_size_; size_t num_fields_; + // total number of row groups + size_t total_row_groups_; + GroupCTMeta(size_t num_fields, milvus::cachinglayer::StorageType storage_type, milvus::cachinglayer::CellIdMappingMode cell_id_mapping_mode, @@ -34,7 +47,16 @@ struct GroupCTMeta : public milvus::cachinglayer::Meta { cell_data_type, cache_warmup_policy, support_eviction), - num_fields_(num_fields) { + num_fields_(num_fields), + total_row_groups_(0) { + } + + // Get the range of row groups for a cell [start, end) + std::pair + get_row_group_range(size_t cid) const { + size_t start = cid * kRowGroupsPerCell; + size_t end = std::min(start + kRowGroupsPerCell, total_row_groups_); + return {start, end}; } }; diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index d2844a6479..0465f07ec1 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -144,19 +144,40 @@ GroupChunkTranslator::GroupChunkTranslator( file_metas.size()); } - meta_.num_rows_until_chunk_.reserve(total_row_groups + 1); - meta_.chunk_memory_size_.reserve(total_row_groups); - - meta_.num_rows_until_chunk_.push_back(0); + // Collect row group sizes and row counts + std::vector row_group_row_counts; + std::vector row_group_sizes; + row_group_sizes.reserve(total_row_groups); + row_group_row_counts.reserve(total_row_groups); for (const auto& row_group_meta : row_group_meta_list_) { for (int i = 0; i < row_group_meta.size(); ++i) { - meta_.num_rows_until_chunk_.push_back( - meta_.num_rows_until_chunk_.back() + - row_group_meta.Get(i).row_num()); - meta_.chunk_memory_size_.push_back( - row_group_meta.Get(i).memory_size()); + row_group_sizes.push_back(row_group_meta.Get(i).memory_size()); + row_group_row_counts.push_back(row_group_meta.Get(i).row_num()); } } + + // Build cell mapping: each cell contains up to kRowGroupsPerCell row groups + meta_.total_row_groups_ = total_row_groups; + size_t num_cells = + (total_row_groups + kRowGroupsPerCell - 1) / kRowGroupsPerCell; + + // Merge row groups into group chunks(cache cells) + meta_.num_rows_until_chunk_.reserve(num_cells + 1); + meta_.num_rows_until_chunk_.push_back(0); + meta_.chunk_memory_size_.reserve(num_cells); + + int64_t cumulative_rows = 0; + for (size_t cell_id = 0; cell_id < num_cells; ++cell_id) { + auto [start, end] = meta_.get_row_group_range(cell_id); + int64_t cell_size = 0; + for (size_t i = start; i < end; ++i) { + cumulative_rows += row_group_row_counts[i]; + cell_size += row_group_sizes[i]; + } + meta_.num_rows_until_chunk_.push_back(cumulative_rows); + meta_.chunk_memory_size_.push_back(cell_size); + } + AssertInfo( meta_.num_rows_until_chunk_.back() == column_group_info_.row_count, fmt::format( @@ -165,6 +186,14 @@ GroupChunkTranslator::GroupChunkTranslator( column_group_info_.field_id, meta_.num_rows_until_chunk_.back(), column_group_info_.row_count)); + + LOG_INFO( + "[StorageV2] translator {} merged {} row groups into {} cells ({} " + "row groups per cell)", + key_, + total_row_groups, + num_cells, + kRowGroupsPerCell); } GroupChunkTranslator::~GroupChunkTranslator() { @@ -184,10 +213,8 @@ std::pair GroupChunkTranslator::estimated_byte_size_of_cell( milvus::cachinglayer::cid_t cid) const { - auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid); - auto& row_group_meta = row_group_meta_list_[file_idx].Get(row_group_idx); - - auto cell_sz = static_cast(row_group_meta.memory_size()); + assert(cid < meta_.chunk_memory_size_.size()); + auto cell_sz = meta_.chunk_memory_size_[cid]; if (use_mmap_) { // why double the disk size for loading? @@ -205,26 +232,29 @@ GroupChunkTranslator::key() const { } std::pair -GroupChunkTranslator::get_file_and_row_group_index( - milvus::cachinglayer::cid_t cid) const { +GroupChunkTranslator::get_file_and_row_group_offset( + size_t global_row_group_idx) const { for (size_t file_idx = 0; file_idx < file_row_group_prefix_sum_.size() - 1; ++file_idx) { - if (cid < file_row_group_prefix_sum_[file_idx + 1]) { - return {file_idx, cid - file_row_group_prefix_sum_[file_idx]}; + if (global_row_group_idx < file_row_group_prefix_sum_[file_idx + 1]) { + return { + file_idx, + global_row_group_idx - file_row_group_prefix_sum_[file_idx]}; } } - AssertInfo(false, - fmt::format("[StorageV2] translator {} cid {} is out of range. " - "Total row groups across all files: {}", - key_, - cid, - file_row_group_prefix_sum_.back())); + AssertInfo( + false, + fmt::format("[StorageV2] translator {} global_row_group_idx {} is out " + "of range. Total row groups across all files: {}", + key_, + global_row_group_idx, + file_row_group_prefix_sum_.back())); } milvus::cachinglayer::cid_t -GroupChunkTranslator::get_cid_from_file_and_row_group_index( - size_t file_idx, size_t row_group_idx) const { +GroupChunkTranslator::get_global_row_group_idx(size_t file_idx, + size_t row_group_idx) const { AssertInfo(file_idx < file_row_group_prefix_sum_.size() - 1, fmt::format("[StorageV2] translator {} file_idx {} is out of " "range. Total files: {}", @@ -246,7 +276,6 @@ GroupChunkTranslator::get_cid_from_file_and_row_group_index( return file_start + row_group_idx; } -// the returned cids are sorted. It may not follow the order of cids. std::vector>> GroupChunkTranslator::get_cells(const std::vector& cids) { std::vector& cids) { cells; cells.reserve(cids.size()); - // Create row group lists for requested cids - std::vector> row_group_lists(insert_files_.size()); + auto max_cid = *std::max_element(cids.begin(), cids.end()); + if (max_cid >= meta_.chunk_memory_size_.size()) { + ThrowInfo( + ErrorCode::UnexpectedError, + "[StorageV2] translator {} cid {} is out of range. Total cells: {}", + key_, + max_cid, + meta_.chunk_memory_size_.size()); + } + // Collect all row group indices needed for the requested cells + std::vector needed_row_group_indices; + needed_row_group_indices.reserve(kRowGroupsPerCell * cids.size()); for (auto cid : cids) { - auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid); - row_group_lists[file_idx].push_back(row_group_idx); + auto [start, end] = meta_.get_row_group_range(cid); + for (size_t i = start; i < end; ++i) { + needed_row_group_indices.push_back(i); + } + } + + // Create row group lists for file loading + std::vector> row_group_lists(insert_files_.size()); + for (auto rg_idx : needed_row_group_indices) { + auto [file_idx, row_group_off] = get_file_and_row_group_offset(rg_idx); + row_group_lists[file_idx].push_back(row_group_off); } auto parallel_degree = @@ -288,57 +336,72 @@ GroupChunkTranslator::get_cells(const std::vector& cids) { key_, column_group_info_.field_id); + // Collect loaded tables by row group index + std::unordered_map> row_group_tables; + row_group_tables.reserve(needed_row_group_indices.size()); + std::shared_ptr r; - std::unordered_set filled_cids; - filled_cids.reserve(cids.size()); + // !!! NOTE: the popped row group tables are sorted by the global row group index + // !!! Never rely on the order of the popped row group tables. while (channel->pop(r)) { for (const auto& table_info : r->arrow_tables) { - // Convert file_index and row_group_index to global cid - auto cid = get_cid_from_file_and_row_group_index( - table_info.file_index, table_info.row_group_index); - cells.emplace_back(cid, load_group_chunk(table_info.table, cid)); - filled_cids.insert(cid); + // Convert file_index and row_group_index (file inner index, not global index) to global row group index + auto rg_idx = get_global_row_group_idx(table_info.file_index, + table_info.row_group_index); + row_group_tables[rg_idx] = table_info.table; } } // access underlying feature to get exception if any load_future.get(); - // Verify all requested cids have been filled + // Build cells from collected tables for (auto cid : cids) { - AssertInfo(filled_cids.find(cid) != filled_cids.end(), - "[StorageV2] translator {} cid {} was not filled, missing " - "row group id {}", - key_, - cid, - cid); + auto [start, end] = meta_.get_row_group_range(cid); + std::vector> tables; + tables.reserve(end - start); + + for (size_t i = start; i < end; ++i) { + auto it = row_group_tables.find(i); + AssertInfo(it != row_group_tables.end(), + fmt::format("[StorageV2] translator {} row group {} " + "for cell {} was not loaded", + key_, + i, + cid)); + tables.push_back(it->second); + } + + cells.emplace_back(cid, load_group_chunk(tables, cid)); } + return cells; } std::unique_ptr GroupChunkTranslator::load_group_chunk( - const std::shared_ptr& table, + const std::vector>& tables, const milvus::cachinglayer::cid_t cid) { - AssertInfo(table != nullptr, "arrow table is nullptr"); - // Create chunks for each field in this batch - std::unordered_map> chunks; - // Iterate through field_id_list to get field_id and create chunk - std::vector field_ids; - std::vector field_metas; - std::vector array_vecs; - field_metas.reserve(table->schema()->num_fields()); - array_vecs.reserve(table->schema()->num_fields()); + assert(!tables.empty()); + // Use the first table's schema as reference for field iteration + const auto& schema = tables[0]->schema(); - for (int i = 0; i < table->schema()->num_fields(); ++i) { - AssertInfo(table->schema()->field(i)->metadata()->Contains( + // Collect field info and merge array vectors from all tables + std::vector field_ids; + field_ids.reserve(schema->num_fields()); + std::vector field_metas; + field_metas.reserve(schema->num_fields()); + std::vector array_vecs; + array_vecs.reserve(schema->num_fields()); + + for (int i = 0; i < schema->num_fields(); ++i) { + AssertInfo(schema->field(i)->metadata()->Contains( milvus_storage::ARROW_FIELD_ID_KEY), "[StorageV2] translator {} field id not found in metadata " "for field {}", key_, - table->schema()->field(i)->name()); - auto field_id = std::stoll(table->schema() - ->field(i) + schema->field(i)->name()); + auto field_id = std::stoll(schema->field(i) ->metadata() ->Get(milvus_storage::ARROW_FIELD_ID_KEY) ->data()); @@ -355,12 +418,22 @@ GroupChunkTranslator::load_group_chunk( key_, fid.get()); const auto& field_meta = it->second; - const arrow::ArrayVector& array_vec = table->column(i)->chunks(); + + // Merge array vectors from all tables for this field + // All tables in a cell come from the same column group with consistent schema + arrow::ArrayVector merged_array_vec; + for (const auto& table : tables) { + const arrow::ArrayVector& array_vec = table->column(i)->chunks(); + merged_array_vec.insert( + merged_array_vec.end(), array_vec.begin(), array_vec.end()); + } + field_ids.push_back(fid); field_metas.push_back(field_meta); - array_vecs.push_back(array_vec); + array_vecs.push_back(std::move(merged_array_vec)); } + std::unordered_map> chunks; if (!use_mmap_) { chunks = create_group_chunk(field_ids, field_metas, array_vecs); } else { diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h index 9b6eee0416..20575fc761 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h @@ -66,11 +66,10 @@ class GroupChunkTranslator get_cells(const std::vector& cids) override; std::pair - get_file_and_row_group_index(milvus::cachinglayer::cid_t cid) const; + get_file_and_row_group_offset(size_t global_row_group_idx) const; milvus::cachinglayer::cid_t - get_cid_from_file_and_row_group_index(size_t file_idx, - size_t row_group_idx) const; + get_global_row_group_idx(size_t file_idx, size_t row_group_idx) const; milvus::cachinglayer::Meta* meta() override { @@ -83,12 +82,8 @@ class GroupChunkTranslator constexpr int64_t MIN_STORAGE_BYTES = 1 * 1024 * 1024; int64_t total_size = 0; for (auto cid : cids) { - auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid); - auto& row_group_meta = - row_group_meta_list_[file_idx].Get(row_group_idx); total_size += - std::max(static_cast(row_group_meta.memory_size()), - MIN_STORAGE_BYTES); + std::max(meta_.chunk_memory_size_[cid], MIN_STORAGE_BYTES); } return total_size; } @@ -104,8 +99,9 @@ class GroupChunkTranslator } private: + // Load a single cell which may contain multiple row groups std::unique_ptr - load_group_chunk(const std::shared_ptr& table, + load_group_chunk(const std::vector>& tables, const milvus::cachinglayer::cid_t cid); int64_t segment_id_; diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp index a40ba5dd36..4ea7ba241d 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp @@ -127,7 +127,9 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { reader_result.status().ToString()); auto fr = reader_result.ValueOrDie(); auto expected_num_cells = - fr->file_metadata()->GetRowGroupMetadataVector().size(); + (fr->file_metadata()->GetRowGroupMetadataVector().size() + + kRowGroupsPerCell - 1) / + kRowGroupsPerCell; auto row_group_metadata_vector = fr->file_metadata()->GetRowGroupMetadataVector(); auto status = fr->Close(); @@ -144,11 +146,16 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { // estimated byte size for (size_t i = 0; i < translator->num_cells(); ++i) { - auto [file_idx, row_group_idx] = - translator->get_file_and_row_group_index(i); - // Get the expected size from the file directly - auto expected_size = static_cast( - row_group_metadata_vector.Get(row_group_idx).memory_size()); + auto [start, end] = static_cast(translator->meta()) + ->get_row_group_range(i); + auto expected_size = 0; + for (size_t j = start; j < end; ++j) { + auto [file_idx, row_group_idx] = + translator->get_file_and_row_group_offset(j); + // Get the expected size from the file directly + expected_size += static_cast( + row_group_metadata_vector.Get(row_group_idx).memory_size()); + } auto usage = translator->estimated_byte_size_of_cell(i).first; if (use_mmap) { EXPECT_EQ(usage.file_bytes, expected_size); @@ -157,8 +164,11 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { } } - // getting cells - std::vector cids = {0, 1}; + // getting cells - use all valid cell IDs + std::vector cids; + for (size_t i = 0; i < translator->num_cells(); ++i) { + cids.push_back(i); + } auto cells = translator->get_cells(cids); EXPECT_EQ(cells.size(), cids.size()); @@ -177,10 +187,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells); EXPECT_EQ(expected_total_size, chunked_column_group->memory_size()); - // Verify the mmap files for cell 0 and 1 are created - std::vector mmap_paths = { - (temp_dir / "seg_0_cg_0_0").string(), - (temp_dir / "seg_0_cg_0_1").string()}; + // Verify the mmap files for all cells are created + std::vector mmap_paths; + for (size_t i = 0; i < num_cells; ++i) { + mmap_paths.push_back( + (temp_dir / ("seg_0_cg_0_" + std::to_string(i))).string()); + } // Verify mmap directory and files if in mmap mode if (use_mmap) { for (const auto& mmap_path : mmap_paths) { @@ -275,62 +287,91 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) { for (auto row_groups : expected_row_groups_per_file) { expected_total_cells += row_groups; } + expected_total_cells = + (expected_total_cells + kRowGroupsPerCell - 1) / kRowGroupsPerCell; EXPECT_EQ(translator->num_cells(), expected_total_cells); - // Test get_file_and_row_group_index for cids across different files - int64_t cid_offset = 0; + // Test get_file_and_row_group_offset for global row group indices across + // different files + size_t global_rg_idx = 0; for (size_t file_idx = 0; file_idx < expected_row_groups_per_file.size(); ++file_idx) { for (int64_t row_group_idx = 0; row_group_idx < expected_row_groups_per_file[file_idx]; ++row_group_idx) { - auto cid = cid_offset + row_group_idx; auto [actual_file_idx, actual_row_group_idx] = - translator->get_file_and_row_group_index(cid); + translator->get_file_and_row_group_offset(global_rg_idx); EXPECT_EQ(actual_file_idx, file_idx); EXPECT_EQ(actual_row_group_idx, row_group_idx); + global_rg_idx++; } - cid_offset += expected_row_groups_per_file[file_idx]; } - // Test get_cells with cids from the same file - std::vector same_file_cids = {0, - 1}; // Both from file 0 - auto same_file_cells = translator->get_cells(same_file_cids); - EXPECT_EQ(same_file_cells.size(), same_file_cids.size()); + // Test get_cells with first two cells (if available) + auto num_cells = translator->num_cells(); + std::vector first_cids; + for (size_t i = 0; i < std::min(num_cells, static_cast(2)); ++i) { + first_cids.push_back(i); + } + auto first_cells = translator->get_cells(first_cids); + EXPECT_EQ(first_cells.size(), first_cids.size()); int i = 0; - for (const auto& [cid, chunk] : same_file_cells) { - EXPECT_EQ(cid, same_file_cids[i++]); + for (const auto& [cid, chunk] : first_cells) { + EXPECT_EQ(cid, first_cids[i++]); } - // Test get_cells with cids in reverse order to test sorting - std::vector cross_file_cids = {4, 7, 0}; - auto cells = translator->get_cells(cross_file_cids); - std::vector returned_cids = {0, 4, 7}; + // Test get_cells with cids in reverse order to verify order preservation + // Use all valid cells in reverse order + std::vector reverse_cids; + for (size_t j = num_cells; j > 0; --j) { + reverse_cids.push_back(j - 1); + } + auto cells = translator->get_cells(reverse_cids); + // Returned cids should be in the same order as input (reverse order) i = 0; for (const auto& [cid, chunk] : cells) { - EXPECT_EQ(cid, returned_cids[i++]); + EXPECT_EQ(cid, reverse_cids[i]); // Verify order preservation + i++; } + EXPECT_EQ(cells.size(), num_cells); // Test estimated byte size for cids across different files - for (size_t i = 0; i < translator->num_cells(); ++i) { - auto [file_idx, row_group_idx] = - translator->get_file_and_row_group_index(i); - auto usage = translator->estimated_byte_size_of_cell(i).first; - - // Get the expected memory size from the corresponding file + // First, build a vector of all row group metadata for easy lookup + std::vector> + all_rg_metas; + for (size_t file_idx = 0; file_idx < multi_file_paths.size(); ++file_idx) { auto reader_result = milvus_storage::FileRowGroupReader::Make( fs_, multi_file_paths[file_idx]); AssertInfo(reader_result.ok(), "[StorageV2] Failed to create file row group reader: " + reader_result.status().ToString()); auto fr = reader_result.ValueOrDie(); - auto row_group_metadata_vector = - fr->file_metadata()->GetRowGroupMetadataVector(); - auto expected_size = static_cast( - row_group_metadata_vector.Get(row_group_idx).memory_size()); + all_rg_metas.emplace_back( + file_idx, fr->file_metadata()->GetRowGroupMetadataVector()); auto status = fr->Close(); AssertInfo(status.ok(), "failed to close file reader"); + } + + // For each cell, sum the byte sizes of all row groups it contains + size_t total_row_groups = 0; + for (auto rg_count : expected_row_groups_per_file) { + total_row_groups += rg_count; + } + + for (size_t cid = 0; cid < translator->num_cells(); ++cid) { + auto usage = translator->estimated_byte_size_of_cell(cid).first; + + // Calculate expected size by summing all row groups in this cell + size_t rg_start = cid * kRowGroupsPerCell; + size_t rg_end = + std::min(rg_start + kRowGroupsPerCell, total_row_groups); + int64_t expected_size = 0; + for (size_t rg_idx = rg_start; rg_idx < rg_end; ++rg_idx) { + auto [file_idx, local_rg_idx] = + translator->get_file_and_row_group_offset(rg_idx); + expected_size += static_cast( + all_rg_metas[file_idx].second.Get(local_rg_idx).memory_size()); + } if (use_mmap) { EXPECT_EQ(usage.file_bytes, expected_size); diff --git a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp index 82ae603b9b..6022ef22ce 100644 --- a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.cpp @@ -49,6 +49,7 @@ namespace milvus::segcore::storagev2translator { ManifestGroupTranslator::ManifestGroupTranslator( int64_t segment_id, + GroupChunkType group_chunk_type, int64_t column_group_index, std::unique_ptr chunk_reader, const std::unordered_map& field_metas, @@ -57,6 +58,7 @@ ManifestGroupTranslator::ManifestGroupTranslator( int64_t num_fields, milvus::proto::common::LoadPriority load_priority) : segment_id_(segment_id), + group_chunk_type_(group_chunk_type), column_group_index_(column_group_index), chunk_reader_(std::move(chunk_reader)), key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_index)), @@ -88,33 +90,56 @@ ManifestGroupTranslator::ManifestGroupTranslator( return false; }(), /* is_index */ false), - /* support_eviction */ true) { + /* support_eviction */ true), + use_mmap_(use_mmap), + load_priority_(load_priority) { auto chunk_size_result = chunk_reader_->get_chunk_size(); if (!chunk_size_result.ok()) { - throw std::runtime_error("get chunk size failed"); + throw std::runtime_error("get row group size failed"); } - chunk_size_ = chunk_size_result.ValueOrDie(); + const auto& row_group_sizes = chunk_size_result.ValueOrDie(); auto rows_result = chunk_reader_->get_chunk_rows(); if (!rows_result.ok()) { - throw std::runtime_error("get chunk rows failed"); + throw std::runtime_error("get row group rows failed"); } + const auto& row_group_rows = rows_result.ValueOrDie(); - auto chunk_rows = rows_result.ValueOrDie(); + // Merge row groups into group chunks(cache cells) + size_t total_row_groups = row_group_sizes.size(); + meta_.total_row_groups_ = total_row_groups; + size_t num_cells = + (total_row_groups + kRowGroupsPerCell - 1) / kRowGroupsPerCell; + // Build num_rows_until_chunk_ and chunk_memory_size_ + meta_.num_rows_until_chunk_.reserve(num_cells + 1); meta_.num_rows_until_chunk_.push_back(0); - for (int i = 0; i < chunk_reader_->total_number_of_chunks(); ++i) { - meta_.num_rows_until_chunk_.push_back( - meta_.num_rows_until_chunk_.back() + - static_cast(chunk_rows[i])); - meta_.chunk_memory_size_.push_back( - static_cast(chunk_size_[i])); + meta_.chunk_memory_size_.reserve(num_cells); + + int64_t cumulative_rows = 0; + for (size_t cell_id = 0; cell_id < num_cells; ++cell_id) { + auto [start, end] = meta_.get_row_group_range(cell_id); + int64_t cell_size = 0; + for (size_t i = start; i < end; ++i) { + cumulative_rows += static_cast(row_group_rows[i]); + cell_size += static_cast(row_group_sizes[i]); + } + meta_.num_rows_until_chunk_.push_back(cumulative_rows); + meta_.chunk_memory_size_.push_back(cell_size); } + + LOG_INFO( + "[StorageV2] translator {} merged {} row groups into {} cells ({} " + "row groups per cell)", + key_, + total_row_groups, + num_cells, + kRowGroupsPerCell); } size_t ManifestGroupTranslator::num_cells() const { - return chunk_reader_->total_number_of_chunks(); + return meta_.chunk_memory_size_.size(); } milvus::cachinglayer::cid_t @@ -126,9 +151,8 @@ std::pair ManifestGroupTranslator::estimated_byte_size_of_cell( milvus::cachinglayer::cid_t cid) const { - // return chunk_reader_->get_chunk_size()[cid]; - AssertInfo(cid < chunk_size_.size(), "invalid cid"); - auto cell_sz = static_cast(chunk_size_[cid]); + assert(cid < meta_.chunk_memory_size_.size()); + auto cell_sz = meta_.chunk_memory_size_[cid]; if (use_mmap_) { // why double the disk size for loading? @@ -154,29 +178,63 @@ ManifestGroupTranslator::get_cells( cells; cells.reserve(cids.size()); + auto max_cid = *std::max_element(cids.begin(), cids.end()); + if (max_cid >= meta_.chunk_memory_size_.size()) { + ThrowInfo( + ErrorCode::UnexpectedError, + "[StorageV2] translator {} cid {} is out of range. Total cells: {}", + key_, + max_cid, + meta_.chunk_memory_size_.size()); + } + + // Collect all row group indices needed for the requested cells + std::vector needed_row_group_indices; + needed_row_group_indices.reserve(kRowGroupsPerCell * cids.size()); + for (auto cid : cids) { + auto [start, end] = meta_.get_row_group_range(cid); + for (size_t i = start; i < end; ++i) { + needed_row_group_indices.push_back(static_cast(i)); + } + } + auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - auto read_result = - chunk_reader_->get_chunks(cids, static_cast(parallel_degree)); + auto read_result = chunk_reader_->get_chunks( + needed_row_group_indices, static_cast(parallel_degree)); if (!read_result.ok()) { throw std::runtime_error("get chunk failed"); } - auto chunks = read_result.ValueOrDie(); - for (size_t i = 0; i < chunks.size(); ++i) { - auto& chunk = chunks[i]; - AssertInfo(chunk != nullptr, - "chunk is null, idx = {}, group index = {}, segment id = " - "{}, parallel degree = {}", - i, - column_group_index_, - segment_id_, - parallel_degree); - auto cid = cids[i]; - auto group_chunk = load_group_chunk(chunk, cid); - cells.emplace_back(cid, std::move(group_chunk)); + auto loaded_row_groups = read_result.ValueOrDie(); + + // Build a map from row group index to loaded record batch + std::unordered_map> + row_group_map; + row_group_map.reserve(needed_row_group_indices.size()); + for (size_t i = 0; i < needed_row_group_indices.size(); ++i) { + row_group_map[needed_row_group_indices[i]] = loaded_row_groups[i]; + } + + for (const auto& cid : cids) { + auto [start, end] = meta_.get_row_group_range(cid); + std::vector> record_batches; + record_batches.reserve(end - start); + + for (size_t i = start; i < end; ++i) { + auto it = row_group_map.find(static_cast(i)); + AssertInfo(it != row_group_map.end(), + fmt::format("[StorageV2] translator {} row group {} for " + "cell {} was not loaded", + key_, + i, + cid)); + record_batches.push_back(it->second); + } + + cells.emplace_back(cid, load_group_chunk(record_batches, cid)); } return cells; @@ -184,13 +242,23 @@ ManifestGroupTranslator::get_cells( std::unique_ptr ManifestGroupTranslator::load_group_chunk( - const std::shared_ptr& record_batch, + const std::vector>& record_batches, const milvus::cachinglayer::cid_t cid) { - std::unordered_map> chunks; + assert(!record_batches.empty()); + // Use the first record batch as the reference for field iteration + const auto& first_batch = record_batches[0]; + + std::vector field_ids; + field_ids.reserve(first_batch->num_columns()); + std::vector field_metas; + field_metas.reserve(first_batch->num_columns()); + std::vector array_vecs; + array_vecs.reserve(first_batch->num_columns()); + // Iterate through field_id_list to get field_id and create chunk - for (int i = 0; i < record_batch->num_columns(); ++i) { + for (int i = 0; i < first_batch->num_columns(); ++i) { // column name here is field id - auto column_name = record_batch->column_name(i); + auto column_name = first_batch->column_name(i); auto field_id = std::stoll(column_name); auto fid = milvus::FieldId(field_id); @@ -206,42 +274,54 @@ ManifestGroupTranslator::load_group_chunk( fid.get()); const auto& field_meta = it->second; - const arrow::ArrayVector array_vec = {record_batch->column(i)}; - std::unique_ptr chunk; - if (!use_mmap_) { - // Memory mode - chunk = create_chunk(field_meta, array_vec); - } else { - // Mmap mode - std::filesystem::path filepath; - if (field_meta.get_main_field_id() != INVALID_FIELD_ID) { - // json shredding mode - filepath = std::filesystem::path(mmap_dir_path_) / - std::to_string(segment_id_) / - std::to_string(field_meta.get_main_field_id()) / - std::to_string(field_id) / std::to_string(cid); - } else { - filepath = std::filesystem::path(mmap_dir_path_) / - std::to_string(segment_id_) / - std::to_string(field_id) / std::to_string(cid); - } - - LOG_INFO( - "[StorageV2] translator {} mmaping field {} chunk {} to path " - "{}", - key_, - field_id, - cid, - filepath.string()); - - std::filesystem::create_directories(filepath.parent_path()); - - chunk = create_chunk( - field_meta, array_vec, filepath.string(), load_priority_); + // Merge arrays from all record batches for this field + // All record batches in a cell come from the same column group with consistent schema + arrow::ArrayVector merged_array_vec; + for (const auto& batch : record_batches) { + merged_array_vec.push_back(batch->column(i)); } - chunks[fid] = std::move(chunk); + field_ids.push_back(fid); + field_metas.push_back(field_meta); + array_vecs.push_back(std::move(merged_array_vec)); } + + std::unordered_map> chunks; + if (!use_mmap_) { + // Memory mode + chunks = create_group_chunk(field_ids, field_metas, array_vecs); + } else { + // Mmap mode + std::filesystem::path filepath; + switch (group_chunk_type_) { + case GroupChunkType::DEFAULT: + filepath = std::filesystem::path(mmap_dir_path_) / + fmt::format("seg_{}_cg_{}_{}", + segment_id_, + column_group_index_, + cid); + break; + case GroupChunkType::JSON_KEY_STATS: + filepath = + std::filesystem::path(mmap_dir_path_) / + fmt::format( + "seg_{}_jks_{}_cg_{}_{}", + segment_id_, + // NOTE: here we assume the first field is the main field for json key stats group chunk + std::to_string(field_metas[0].get_main_field_id()), + column_group_index_, + cid); + break; + default: + ThrowInfo(ErrorCode::UnexpectedError, + "unknown group chunk type: {}", + static_cast(group_chunk_type_)); + } + std::filesystem::create_directories(filepath.parent_path()); + chunks = create_group_chunk( + field_ids, field_metas, array_vecs, filepath.string()); + } + return std::make_unique(chunks); } diff --git a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.h b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.h index 6a9300bd67..4d06dfe397 100644 --- a/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.h +++ b/internal/core/src/segcore/storagev2translator/ManifestGroupTranslator.h @@ -47,6 +47,7 @@ class ManifestGroupTranslator * @brief Construct a translator for a column group * * @param segment_id ID of the segment being loaded + * @param group_chunk_type Type of the group chunk * @param column_group_index Index of the column group within the segment * @param chunk_reader Reader for accessing chunks from storage * @param field_metas Metadata for all fields in this column group @@ -57,6 +58,7 @@ class ManifestGroupTranslator */ ManifestGroupTranslator( int64_t segment_id, + GroupChunkType group_chunk_type, int64_t column_group_index, std::unique_ptr chunk_reader, const std::unordered_map& field_metas, @@ -145,36 +147,36 @@ class ManifestGroupTranslator int64_t total_size = 0; for (auto cid : cids) { - total_size += std::max(static_cast(chunk_size_[cid]), - MIN_STORAGE_BYTES); + assert(cid < meta_.chunk_memory_size_.size()); + total_size += + std::max(meta_.chunk_memory_size_[cid], MIN_STORAGE_BYTES); } return total_size; } private: /** - * @brief Load a single chunk from Arrow RecordBatch + * @brief Load a cell from multiple Arrow RecordBatches * - * Converts an Arrow RecordBatch into a GroupChunk containing - * field data for all columns in the chunk. + * Converts multiple Arrow RecordBatches (from row groups) into a single + * GroupChunk containing merged field data for all columns. * - * @param record_batch Arrow RecordBatch containing the chunk data + * @param record_batches Arrow RecordBatches from row groups * @param cid Cell ID of the chunk being loaded * @return GroupChunk containing the loaded field data */ std::unique_ptr - load_group_chunk(const std::shared_ptr& record_batch, - const milvus::cachinglayer::cid_t cid); + load_group_chunk( + const std::vector>& record_batches, + milvus::cachinglayer::cid_t cid); int64_t segment_id_; + GroupChunkType group_chunk_type_; int64_t column_group_index_; std::string key_; std::unordered_map field_metas_; std::unique_ptr chunk_reader_; - // chunk stats from reader - std::vector chunk_size_; - GroupCTMeta meta_; bool use_mmap_; std::string mmap_dir_path_;