enhance: map multi row groups into one cache cell (#46249)

issue: #45486

Introduce row group batching to reduce cache cell granularity and
improve
memory&disk efficiency. Previously, each parquet row group mapped 1:1 to
a cache
cell. Now, up to `kRowGroupsPerCell` (4) row groups are merged into one
cell.
This reduces the number of cache cells (and associated overhead) by ~4x
while
maintaining the same data granularity for loading.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Refactor**
* Switched to cell-based grouping that merges multiple row groups for
more efficient multi-file aggregation and reads.
* Chunk loading now combines multiple source batches/tables per cell and
better supports mmap-backed storage.

* **New Features**
* Exposed helpers to query row-group ranges and global row-group offsets
for diagnostics and testing.
* Translators now accept chunk-type and mmap/load hints to control
on-disk vs in-memory behavior.

* **Bug Fixes**
* Improved bounds checks and clearer error messages for out-of-range
cell requests.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-12-23 14:57:18 +08:00 committed by GitHub
parent d3b15ac136
commit 0a2f8d4f63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 403 additions and 188 deletions

View File

@ -2862,6 +2862,7 @@ ChunkedSegmentSealedImpl::LoadColumnGroup(
auto translator =
std::make_unique<storagev2translator::ManifestGroupTranslator>(
get_segment_id(),
GroupChunkType::DEFAULT,
index,
std::move(chunk_reader),
field_metas,

View File

@ -19,10 +19,23 @@
namespace milvus::segcore::storagev2translator {
// Number of row groups (parquet row groups) merged into one cache cell,
// for now it is a constant.
// hierarchy: 1 group chunk <-> 1 cache cell <-> kRowGroupsPerCell row groups
constexpr size_t kRowGroupsPerCell = 4;
static_assert(kRowGroupsPerCell > 0,
"kRowGroupsPerCell must be greater than 0");
struct GroupCTMeta : public milvus::cachinglayer::Meta {
// num_rows_until_chunk_[i] = total rows(prefix sum) in cells [0, i-1]
// the size of num_rows_until_chunk_ is num_cells + 1
std::vector<int64_t> num_rows_until_chunk_;
// memory size for each group chunk(cache cell)
std::vector<int64_t> chunk_memory_size_;
size_t num_fields_;
// total number of row groups
size_t total_row_groups_;
GroupCTMeta(size_t num_fields,
milvus::cachinglayer::StorageType storage_type,
milvus::cachinglayer::CellIdMappingMode cell_id_mapping_mode,
@ -34,7 +47,16 @@ struct GroupCTMeta : public milvus::cachinglayer::Meta {
cell_data_type,
cache_warmup_policy,
support_eviction),
num_fields_(num_fields) {
num_fields_(num_fields),
total_row_groups_(0) {
}
// Get the range of row groups for a cell [start, end)
std::pair<size_t, size_t>
get_row_group_range(size_t cid) const {
size_t start = cid * kRowGroupsPerCell;
size_t end = std::min(start + kRowGroupsPerCell, total_row_groups_);
return {start, end};
}
};

View File

@ -144,19 +144,40 @@ GroupChunkTranslator::GroupChunkTranslator(
file_metas.size());
}
meta_.num_rows_until_chunk_.reserve(total_row_groups + 1);
meta_.chunk_memory_size_.reserve(total_row_groups);
meta_.num_rows_until_chunk_.push_back(0);
// Collect row group sizes and row counts
std::vector<int64_t> row_group_row_counts;
std::vector<int64_t> row_group_sizes;
row_group_sizes.reserve(total_row_groups);
row_group_row_counts.reserve(total_row_groups);
for (const auto& row_group_meta : row_group_meta_list_) {
for (int i = 0; i < row_group_meta.size(); ++i) {
meta_.num_rows_until_chunk_.push_back(
meta_.num_rows_until_chunk_.back() +
row_group_meta.Get(i).row_num());
meta_.chunk_memory_size_.push_back(
row_group_meta.Get(i).memory_size());
row_group_sizes.push_back(row_group_meta.Get(i).memory_size());
row_group_row_counts.push_back(row_group_meta.Get(i).row_num());
}
}
// Build cell mapping: each cell contains up to kRowGroupsPerCell row groups
meta_.total_row_groups_ = total_row_groups;
size_t num_cells =
(total_row_groups + kRowGroupsPerCell - 1) / kRowGroupsPerCell;
// Merge row groups into group chunks(cache cells)
meta_.num_rows_until_chunk_.reserve(num_cells + 1);
meta_.num_rows_until_chunk_.push_back(0);
meta_.chunk_memory_size_.reserve(num_cells);
int64_t cumulative_rows = 0;
for (size_t cell_id = 0; cell_id < num_cells; ++cell_id) {
auto [start, end] = meta_.get_row_group_range(cell_id);
int64_t cell_size = 0;
for (size_t i = start; i < end; ++i) {
cumulative_rows += row_group_row_counts[i];
cell_size += row_group_sizes[i];
}
meta_.num_rows_until_chunk_.push_back(cumulative_rows);
meta_.chunk_memory_size_.push_back(cell_size);
}
AssertInfo(
meta_.num_rows_until_chunk_.back() == column_group_info_.row_count,
fmt::format(
@ -165,6 +186,14 @@ GroupChunkTranslator::GroupChunkTranslator(
column_group_info_.field_id,
meta_.num_rows_until_chunk_.back(),
column_group_info_.row_count));
LOG_INFO(
"[StorageV2] translator {} merged {} row groups into {} cells ({} "
"row groups per cell)",
key_,
total_row_groups,
num_cells,
kRowGroupsPerCell);
}
GroupChunkTranslator::~GroupChunkTranslator() {
@ -184,10 +213,8 @@ std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
GroupChunkTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
auto& row_group_meta = row_group_meta_list_[file_idx].Get(row_group_idx);
auto cell_sz = static_cast<int64_t>(row_group_meta.memory_size());
assert(cid < meta_.chunk_memory_size_.size());
auto cell_sz = meta_.chunk_memory_size_[cid];
if (use_mmap_) {
// why double the disk size for loading?
@ -205,26 +232,29 @@ GroupChunkTranslator::key() const {
}
std::pair<size_t, size_t>
GroupChunkTranslator::get_file_and_row_group_index(
milvus::cachinglayer::cid_t cid) const {
GroupChunkTranslator::get_file_and_row_group_offset(
size_t global_row_group_idx) const {
for (size_t file_idx = 0; file_idx < file_row_group_prefix_sum_.size() - 1;
++file_idx) {
if (cid < file_row_group_prefix_sum_[file_idx + 1]) {
return {file_idx, cid - file_row_group_prefix_sum_[file_idx]};
if (global_row_group_idx < file_row_group_prefix_sum_[file_idx + 1]) {
return {
file_idx,
global_row_group_idx - file_row_group_prefix_sum_[file_idx]};
}
}
AssertInfo(false,
fmt::format("[StorageV2] translator {} cid {} is out of range. "
"Total row groups across all files: {}",
key_,
cid,
file_row_group_prefix_sum_.back()));
AssertInfo(
false,
fmt::format("[StorageV2] translator {} global_row_group_idx {} is out "
"of range. Total row groups across all files: {}",
key_,
global_row_group_idx,
file_row_group_prefix_sum_.back()));
}
milvus::cachinglayer::cid_t
GroupChunkTranslator::get_cid_from_file_and_row_group_index(
size_t file_idx, size_t row_group_idx) const {
GroupChunkTranslator::get_global_row_group_idx(size_t file_idx,
size_t row_group_idx) const {
AssertInfo(file_idx < file_row_group_prefix_sum_.size() - 1,
fmt::format("[StorageV2] translator {} file_idx {} is out of "
"range. Total files: {}",
@ -246,7 +276,6 @@ GroupChunkTranslator::get_cid_from_file_and_row_group_index(
return file_start + row_group_idx;
}
// the returned cids are sorted. It may not follow the order of cids.
std::vector<std::pair<cachinglayer::cid_t, std::unique_ptr<milvus::GroupChunk>>>
GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
std::vector<std::pair<milvus::cachinglayer::cid_t,
@ -254,12 +283,31 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
cells;
cells.reserve(cids.size());
// Create row group lists for requested cids
std::vector<std::vector<int64_t>> row_group_lists(insert_files_.size());
auto max_cid = *std::max_element(cids.begin(), cids.end());
if (max_cid >= meta_.chunk_memory_size_.size()) {
ThrowInfo(
ErrorCode::UnexpectedError,
"[StorageV2] translator {} cid {} is out of range. Total cells: {}",
key_,
max_cid,
meta_.chunk_memory_size_.size());
}
// Collect all row group indices needed for the requested cells
std::vector<size_t> needed_row_group_indices;
needed_row_group_indices.reserve(kRowGroupsPerCell * cids.size());
for (auto cid : cids) {
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
row_group_lists[file_idx].push_back(row_group_idx);
auto [start, end] = meta_.get_row_group_range(cid);
for (size_t i = start; i < end; ++i) {
needed_row_group_indices.push_back(i);
}
}
// Create row group lists for file loading
std::vector<std::vector<int64_t>> row_group_lists(insert_files_.size());
for (auto rg_idx : needed_row_group_indices) {
auto [file_idx, row_group_off] = get_file_and_row_group_offset(rg_idx);
row_group_lists[file_idx].push_back(row_group_off);
}
auto parallel_degree =
@ -288,57 +336,72 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
key_,
column_group_info_.field_id);
// Collect loaded tables by row group index
std::unordered_map<size_t, std::shared_ptr<arrow::Table>> row_group_tables;
row_group_tables.reserve(needed_row_group_indices.size());
std::shared_ptr<milvus::ArrowDataWrapper> r;
std::unordered_set<cachinglayer::cid_t> filled_cids;
filled_cids.reserve(cids.size());
// !!! NOTE: the popped row group tables are sorted by the global row group index
// !!! Never rely on the order of the popped row group tables.
while (channel->pop(r)) {
for (const auto& table_info : r->arrow_tables) {
// Convert file_index and row_group_index to global cid
auto cid = get_cid_from_file_and_row_group_index(
table_info.file_index, table_info.row_group_index);
cells.emplace_back(cid, load_group_chunk(table_info.table, cid));
filled_cids.insert(cid);
// Convert file_index and row_group_index (file inner index, not global index) to global row group index
auto rg_idx = get_global_row_group_idx(table_info.file_index,
table_info.row_group_index);
row_group_tables[rg_idx] = table_info.table;
}
}
// access underlying feature to get exception if any
load_future.get();
// Verify all requested cids have been filled
// Build cells from collected tables
for (auto cid : cids) {
AssertInfo(filled_cids.find(cid) != filled_cids.end(),
"[StorageV2] translator {} cid {} was not filled, missing "
"row group id {}",
key_,
cid,
cid);
auto [start, end] = meta_.get_row_group_range(cid);
std::vector<std::shared_ptr<arrow::Table>> tables;
tables.reserve(end - start);
for (size_t i = start; i < end; ++i) {
auto it = row_group_tables.find(i);
AssertInfo(it != row_group_tables.end(),
fmt::format("[StorageV2] translator {} row group {} "
"for cell {} was not loaded",
key_,
i,
cid));
tables.push_back(it->second);
}
cells.emplace_back(cid, load_group_chunk(tables, cid));
}
return cells;
}
std::unique_ptr<milvus::GroupChunk>
GroupChunkTranslator::load_group_chunk(
const std::shared_ptr<arrow::Table>& table,
const std::vector<std::shared_ptr<arrow::Table>>& tables,
const milvus::cachinglayer::cid_t cid) {
AssertInfo(table != nullptr, "arrow table is nullptr");
// Create chunks for each field in this batch
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
// Iterate through field_id_list to get field_id and create chunk
std::vector<FieldId> field_ids;
std::vector<FieldMeta> field_metas;
std::vector<arrow::ArrayVector> array_vecs;
field_metas.reserve(table->schema()->num_fields());
array_vecs.reserve(table->schema()->num_fields());
assert(!tables.empty());
// Use the first table's schema as reference for field iteration
const auto& schema = tables[0]->schema();
for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains(
// Collect field info and merge array vectors from all tables
std::vector<FieldId> field_ids;
field_ids.reserve(schema->num_fields());
std::vector<FieldMeta> field_metas;
field_metas.reserve(schema->num_fields());
std::vector<arrow::ArrayVector> array_vecs;
array_vecs.reserve(schema->num_fields());
for (int i = 0; i < schema->num_fields(); ++i) {
AssertInfo(schema->field(i)->metadata()->Contains(
milvus_storage::ARROW_FIELD_ID_KEY),
"[StorageV2] translator {} field id not found in metadata "
"for field {}",
key_,
table->schema()->field(i)->name());
auto field_id = std::stoll(table->schema()
->field(i)
schema->field(i)->name());
auto field_id = std::stoll(schema->field(i)
->metadata()
->Get(milvus_storage::ARROW_FIELD_ID_KEY)
->data());
@ -355,12 +418,22 @@ GroupChunkTranslator::load_group_chunk(
key_,
fid.get());
const auto& field_meta = it->second;
const arrow::ArrayVector& array_vec = table->column(i)->chunks();
// Merge array vectors from all tables for this field
// All tables in a cell come from the same column group with consistent schema
arrow::ArrayVector merged_array_vec;
for (const auto& table : tables) {
const arrow::ArrayVector& array_vec = table->column(i)->chunks();
merged_array_vec.insert(
merged_array_vec.end(), array_vec.begin(), array_vec.end());
}
field_ids.push_back(fid);
field_metas.push_back(field_meta);
array_vecs.push_back(array_vec);
array_vecs.push_back(std::move(merged_array_vec));
}
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
if (!use_mmap_) {
chunks = create_group_chunk(field_ids, field_metas, array_vecs);
} else {

View File

@ -66,11 +66,10 @@ class GroupChunkTranslator
get_cells(const std::vector<milvus::cachinglayer::cid_t>& cids) override;
std::pair<size_t, size_t>
get_file_and_row_group_index(milvus::cachinglayer::cid_t cid) const;
get_file_and_row_group_offset(size_t global_row_group_idx) const;
milvus::cachinglayer::cid_t
get_cid_from_file_and_row_group_index(size_t file_idx,
size_t row_group_idx) const;
get_global_row_group_idx(size_t file_idx, size_t row_group_idx) const;
milvus::cachinglayer::Meta*
meta() override {
@ -83,12 +82,8 @@ class GroupChunkTranslator
constexpr int64_t MIN_STORAGE_BYTES = 1 * 1024 * 1024;
int64_t total_size = 0;
for (auto cid : cids) {
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
auto& row_group_meta =
row_group_meta_list_[file_idx].Get(row_group_idx);
total_size +=
std::max(static_cast<int64_t>(row_group_meta.memory_size()),
MIN_STORAGE_BYTES);
std::max(meta_.chunk_memory_size_[cid], MIN_STORAGE_BYTES);
}
return total_size;
}
@ -104,8 +99,9 @@ class GroupChunkTranslator
}
private:
// Load a single cell which may contain multiple row groups
std::unique_ptr<milvus::GroupChunk>
load_group_chunk(const std::shared_ptr<arrow::Table>& table,
load_group_chunk(const std::vector<std::shared_ptr<arrow::Table>>& tables,
const milvus::cachinglayer::cid_t cid);
int64_t segment_id_;

View File

@ -127,7 +127,9 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto expected_num_cells =
fr->file_metadata()->GetRowGroupMetadataVector().size();
(fr->file_metadata()->GetRowGroupMetadataVector().size() +
kRowGroupsPerCell - 1) /
kRowGroupsPerCell;
auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close();
@ -144,11 +146,16 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
// estimated byte size
for (size_t i = 0; i < translator->num_cells(); ++i) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_index(i);
// Get the expected size from the file directly
auto expected_size = static_cast<int64_t>(
row_group_metadata_vector.Get(row_group_idx).memory_size());
auto [start, end] = static_cast<GroupCTMeta*>(translator->meta())
->get_row_group_range(i);
auto expected_size = 0;
for (size_t j = start; j < end; ++j) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_offset(j);
// Get the expected size from the file directly
expected_size += static_cast<int64_t>(
row_group_metadata_vector.Get(row_group_idx).memory_size());
}
auto usage = translator->estimated_byte_size_of_cell(i).first;
if (use_mmap) {
EXPECT_EQ(usage.file_bytes, expected_size);
@ -157,8 +164,11 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
}
}
// getting cells
std::vector<cachinglayer::cid_t> cids = {0, 1};
// getting cells - use all valid cell IDs
std::vector<cachinglayer::cid_t> cids;
for (size_t i = 0; i < translator->num_cells(); ++i) {
cids.push_back(i);
}
auto cells = translator->get_cells(cids);
EXPECT_EQ(cells.size(), cids.size());
@ -177,10 +187,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells);
EXPECT_EQ(expected_total_size, chunked_column_group->memory_size());
// Verify the mmap files for cell 0 and 1 are created
std::vector<std::string> mmap_paths = {
(temp_dir / "seg_0_cg_0_0").string(),
(temp_dir / "seg_0_cg_0_1").string()};
// Verify the mmap files for all cells are created
std::vector<std::string> mmap_paths;
for (size_t i = 0; i < num_cells; ++i) {
mmap_paths.push_back(
(temp_dir / ("seg_0_cg_0_" + std::to_string(i))).string());
}
// Verify mmap directory and files if in mmap mode
if (use_mmap) {
for (const auto& mmap_path : mmap_paths) {
@ -275,62 +287,91 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
for (auto row_groups : expected_row_groups_per_file) {
expected_total_cells += row_groups;
}
expected_total_cells =
(expected_total_cells + kRowGroupsPerCell - 1) / kRowGroupsPerCell;
EXPECT_EQ(translator->num_cells(), expected_total_cells);
// Test get_file_and_row_group_index for cids across different files
int64_t cid_offset = 0;
// Test get_file_and_row_group_offset for global row group indices across
// different files
size_t global_rg_idx = 0;
for (size_t file_idx = 0; file_idx < expected_row_groups_per_file.size();
++file_idx) {
for (int64_t row_group_idx = 0;
row_group_idx < expected_row_groups_per_file[file_idx];
++row_group_idx) {
auto cid = cid_offset + row_group_idx;
auto [actual_file_idx, actual_row_group_idx] =
translator->get_file_and_row_group_index(cid);
translator->get_file_and_row_group_offset(global_rg_idx);
EXPECT_EQ(actual_file_idx, file_idx);
EXPECT_EQ(actual_row_group_idx, row_group_idx);
global_rg_idx++;
}
cid_offset += expected_row_groups_per_file[file_idx];
}
// Test get_cells with cids from the same file
std::vector<cachinglayer::cid_t> same_file_cids = {0,
1}; // Both from file 0
auto same_file_cells = translator->get_cells(same_file_cids);
EXPECT_EQ(same_file_cells.size(), same_file_cids.size());
// Test get_cells with first two cells (if available)
auto num_cells = translator->num_cells();
std::vector<cachinglayer::cid_t> first_cids;
for (size_t i = 0; i < std::min(num_cells, static_cast<size_t>(2)); ++i) {
first_cids.push_back(i);
}
auto first_cells = translator->get_cells(first_cids);
EXPECT_EQ(first_cells.size(), first_cids.size());
int i = 0;
for (const auto& [cid, chunk] : same_file_cells) {
EXPECT_EQ(cid, same_file_cids[i++]);
for (const auto& [cid, chunk] : first_cells) {
EXPECT_EQ(cid, first_cids[i++]);
}
// Test get_cells with cids in reverse order to test sorting
std::vector<cachinglayer::cid_t> cross_file_cids = {4, 7, 0};
auto cells = translator->get_cells(cross_file_cids);
std::vector<cachinglayer::cid_t> returned_cids = {0, 4, 7};
// Test get_cells with cids in reverse order to verify order preservation
// Use all valid cells in reverse order
std::vector<cachinglayer::cid_t> reverse_cids;
for (size_t j = num_cells; j > 0; --j) {
reverse_cids.push_back(j - 1);
}
auto cells = translator->get_cells(reverse_cids);
// Returned cids should be in the same order as input (reverse order)
i = 0;
for (const auto& [cid, chunk] : cells) {
EXPECT_EQ(cid, returned_cids[i++]);
EXPECT_EQ(cid, reverse_cids[i]); // Verify order preservation
i++;
}
EXPECT_EQ(cells.size(), num_cells);
// Test estimated byte size for cids across different files
for (size_t i = 0; i < translator->num_cells(); ++i) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_index(i);
auto usage = translator->estimated_byte_size_of_cell(i).first;
// Get the expected memory size from the corresponding file
// First, build a vector of all row group metadata for easy lookup
std::vector<std::pair<size_t, milvus_storage::RowGroupMetadataVector>>
all_rg_metas;
for (size_t file_idx = 0; file_idx < multi_file_paths.size(); ++file_idx) {
auto reader_result = milvus_storage::FileRowGroupReader::Make(
fs_, multi_file_paths[file_idx]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector();
auto expected_size = static_cast<int64_t>(
row_group_metadata_vector.Get(row_group_idx).memory_size());
all_rg_metas.emplace_back(
file_idx, fr->file_metadata()->GetRowGroupMetadataVector());
auto status = fr->Close();
AssertInfo(status.ok(), "failed to close file reader");
}
// For each cell, sum the byte sizes of all row groups it contains
size_t total_row_groups = 0;
for (auto rg_count : expected_row_groups_per_file) {
total_row_groups += rg_count;
}
for (size_t cid = 0; cid < translator->num_cells(); ++cid) {
auto usage = translator->estimated_byte_size_of_cell(cid).first;
// Calculate expected size by summing all row groups in this cell
size_t rg_start = cid * kRowGroupsPerCell;
size_t rg_end =
std::min(rg_start + kRowGroupsPerCell, total_row_groups);
int64_t expected_size = 0;
for (size_t rg_idx = rg_start; rg_idx < rg_end; ++rg_idx) {
auto [file_idx, local_rg_idx] =
translator->get_file_and_row_group_offset(rg_idx);
expected_size += static_cast<int64_t>(
all_rg_metas[file_idx].second.Get(local_rg_idx).memory_size());
}
if (use_mmap) {
EXPECT_EQ(usage.file_bytes, expected_size);

View File

@ -49,6 +49,7 @@ namespace milvus::segcore::storagev2translator {
ManifestGroupTranslator::ManifestGroupTranslator(
int64_t segment_id,
GroupChunkType group_chunk_type,
int64_t column_group_index,
std::unique_ptr<milvus_storage::api::ChunkReader> chunk_reader,
const std::unordered_map<FieldId, FieldMeta>& field_metas,
@ -57,6 +58,7 @@ ManifestGroupTranslator::ManifestGroupTranslator(
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id),
group_chunk_type_(group_chunk_type),
column_group_index_(column_group_index),
chunk_reader_(std::move(chunk_reader)),
key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_index)),
@ -88,33 +90,56 @@ ManifestGroupTranslator::ManifestGroupTranslator(
return false;
}(),
/* is_index */ false),
/* support_eviction */ true) {
/* support_eviction */ true),
use_mmap_(use_mmap),
load_priority_(load_priority) {
auto chunk_size_result = chunk_reader_->get_chunk_size();
if (!chunk_size_result.ok()) {
throw std::runtime_error("get chunk size failed");
throw std::runtime_error("get row group size failed");
}
chunk_size_ = chunk_size_result.ValueOrDie();
const auto& row_group_sizes = chunk_size_result.ValueOrDie();
auto rows_result = chunk_reader_->get_chunk_rows();
if (!rows_result.ok()) {
throw std::runtime_error("get chunk rows failed");
throw std::runtime_error("get row group rows failed");
}
const auto& row_group_rows = rows_result.ValueOrDie();
auto chunk_rows = rows_result.ValueOrDie();
// Merge row groups into group chunks(cache cells)
size_t total_row_groups = row_group_sizes.size();
meta_.total_row_groups_ = total_row_groups;
size_t num_cells =
(total_row_groups + kRowGroupsPerCell - 1) / kRowGroupsPerCell;
// Build num_rows_until_chunk_ and chunk_memory_size_
meta_.num_rows_until_chunk_.reserve(num_cells + 1);
meta_.num_rows_until_chunk_.push_back(0);
for (int i = 0; i < chunk_reader_->total_number_of_chunks(); ++i) {
meta_.num_rows_until_chunk_.push_back(
meta_.num_rows_until_chunk_.back() +
static_cast<int64_t>(chunk_rows[i]));
meta_.chunk_memory_size_.push_back(
static_cast<int64_t>(chunk_size_[i]));
meta_.chunk_memory_size_.reserve(num_cells);
int64_t cumulative_rows = 0;
for (size_t cell_id = 0; cell_id < num_cells; ++cell_id) {
auto [start, end] = meta_.get_row_group_range(cell_id);
int64_t cell_size = 0;
for (size_t i = start; i < end; ++i) {
cumulative_rows += static_cast<int64_t>(row_group_rows[i]);
cell_size += static_cast<int64_t>(row_group_sizes[i]);
}
meta_.num_rows_until_chunk_.push_back(cumulative_rows);
meta_.chunk_memory_size_.push_back(cell_size);
}
LOG_INFO(
"[StorageV2] translator {} merged {} row groups into {} cells ({} "
"row groups per cell)",
key_,
total_row_groups,
num_cells,
kRowGroupsPerCell);
}
size_t
ManifestGroupTranslator::num_cells() const {
return chunk_reader_->total_number_of_chunks();
return meta_.chunk_memory_size_.size();
}
milvus::cachinglayer::cid_t
@ -126,9 +151,8 @@ std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
ManifestGroupTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
// return chunk_reader_->get_chunk_size()[cid];
AssertInfo(cid < chunk_size_.size(), "invalid cid");
auto cell_sz = static_cast<int64_t>(chunk_size_[cid]);
assert(cid < meta_.chunk_memory_size_.size());
auto cell_sz = meta_.chunk_memory_size_[cid];
if (use_mmap_) {
// why double the disk size for loading?
@ -154,29 +178,63 @@ ManifestGroupTranslator::get_cells(
cells;
cells.reserve(cids.size());
auto max_cid = *std::max_element(cids.begin(), cids.end());
if (max_cid >= meta_.chunk_memory_size_.size()) {
ThrowInfo(
ErrorCode::UnexpectedError,
"[StorageV2] translator {} cid {} is out of range. Total cells: {}",
key_,
max_cid,
meta_.chunk_memory_size_.size());
}
// Collect all row group indices needed for the requested cells
std::vector<int64_t> needed_row_group_indices;
needed_row_group_indices.reserve(kRowGroupsPerCell * cids.size());
for (auto cid : cids) {
auto [start, end] = meta_.get_row_group_range(cid);
for (size_t i = start; i < end; ++i) {
needed_row_group_indices.push_back(static_cast<int64_t>(i));
}
}
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
auto read_result =
chunk_reader_->get_chunks(cids, static_cast<int64_t>(parallel_degree));
auto read_result = chunk_reader_->get_chunks(
needed_row_group_indices, static_cast<int64_t>(parallel_degree));
if (!read_result.ok()) {
throw std::runtime_error("get chunk failed");
}
auto chunks = read_result.ValueOrDie();
for (size_t i = 0; i < chunks.size(); ++i) {
auto& chunk = chunks[i];
AssertInfo(chunk != nullptr,
"chunk is null, idx = {}, group index = {}, segment id = "
"{}, parallel degree = {}",
i,
column_group_index_,
segment_id_,
parallel_degree);
auto cid = cids[i];
auto group_chunk = load_group_chunk(chunk, cid);
cells.emplace_back(cid, std::move(group_chunk));
auto loaded_row_groups = read_result.ValueOrDie();
// Build a map from row group index to loaded record batch
std::unordered_map<int64_t, std::shared_ptr<arrow::RecordBatch>>
row_group_map;
row_group_map.reserve(needed_row_group_indices.size());
for (size_t i = 0; i < needed_row_group_indices.size(); ++i) {
row_group_map[needed_row_group_indices[i]] = loaded_row_groups[i];
}
for (const auto& cid : cids) {
auto [start, end] = meta_.get_row_group_range(cid);
std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
record_batches.reserve(end - start);
for (size_t i = start; i < end; ++i) {
auto it = row_group_map.find(static_cast<int64_t>(i));
AssertInfo(it != row_group_map.end(),
fmt::format("[StorageV2] translator {} row group {} for "
"cell {} was not loaded",
key_,
i,
cid));
record_batches.push_back(it->second);
}
cells.emplace_back(cid, load_group_chunk(record_batches, cid));
}
return cells;
@ -184,13 +242,23 @@ ManifestGroupTranslator::get_cells(
std::unique_ptr<milvus::GroupChunk>
ManifestGroupTranslator::load_group_chunk(
const std::shared_ptr<arrow::RecordBatch>& record_batch,
const std::vector<std::shared_ptr<arrow::RecordBatch>>& record_batches,
const milvus::cachinglayer::cid_t cid) {
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
assert(!record_batches.empty());
// Use the first record batch as the reference for field iteration
const auto& first_batch = record_batches[0];
std::vector<FieldId> field_ids;
field_ids.reserve(first_batch->num_columns());
std::vector<FieldMeta> field_metas;
field_metas.reserve(first_batch->num_columns());
std::vector<arrow::ArrayVector> array_vecs;
array_vecs.reserve(first_batch->num_columns());
// Iterate through field_id_list to get field_id and create chunk
for (int i = 0; i < record_batch->num_columns(); ++i) {
for (int i = 0; i < first_batch->num_columns(); ++i) {
// column name here is field id
auto column_name = record_batch->column_name(i);
auto column_name = first_batch->column_name(i);
auto field_id = std::stoll(column_name);
auto fid = milvus::FieldId(field_id);
@ -206,42 +274,54 @@ ManifestGroupTranslator::load_group_chunk(
fid.get());
const auto& field_meta = it->second;
const arrow::ArrayVector array_vec = {record_batch->column(i)};
std::unique_ptr<Chunk> chunk;
if (!use_mmap_) {
// Memory mode
chunk = create_chunk(field_meta, array_vec);
} else {
// Mmap mode
std::filesystem::path filepath;
if (field_meta.get_main_field_id() != INVALID_FIELD_ID) {
// json shredding mode
filepath = std::filesystem::path(mmap_dir_path_) /
std::to_string(segment_id_) /
std::to_string(field_meta.get_main_field_id()) /
std::to_string(field_id) / std::to_string(cid);
} else {
filepath = std::filesystem::path(mmap_dir_path_) /
std::to_string(segment_id_) /
std::to_string(field_id) / std::to_string(cid);
}
LOG_INFO(
"[StorageV2] translator {} mmaping field {} chunk {} to path "
"{}",
key_,
field_id,
cid,
filepath.string());
std::filesystem::create_directories(filepath.parent_path());
chunk = create_chunk(
field_meta, array_vec, filepath.string(), load_priority_);
// Merge arrays from all record batches for this field
// All record batches in a cell come from the same column group with consistent schema
arrow::ArrayVector merged_array_vec;
for (const auto& batch : record_batches) {
merged_array_vec.push_back(batch->column(i));
}
chunks[fid] = std::move(chunk);
field_ids.push_back(fid);
field_metas.push_back(field_meta);
array_vecs.push_back(std::move(merged_array_vec));
}
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
if (!use_mmap_) {
// Memory mode
chunks = create_group_chunk(field_ids, field_metas, array_vecs);
} else {
// Mmap mode
std::filesystem::path filepath;
switch (group_chunk_type_) {
case GroupChunkType::DEFAULT:
filepath = std::filesystem::path(mmap_dir_path_) /
fmt::format("seg_{}_cg_{}_{}",
segment_id_,
column_group_index_,
cid);
break;
case GroupChunkType::JSON_KEY_STATS:
filepath =
std::filesystem::path(mmap_dir_path_) /
fmt::format(
"seg_{}_jks_{}_cg_{}_{}",
segment_id_,
// NOTE: here we assume the first field is the main field for json key stats group chunk
std::to_string(field_metas[0].get_main_field_id()),
column_group_index_,
cid);
break;
default:
ThrowInfo(ErrorCode::UnexpectedError,
"unknown group chunk type: {}",
static_cast<uint8_t>(group_chunk_type_));
}
std::filesystem::create_directories(filepath.parent_path());
chunks = create_group_chunk(
field_ids, field_metas, array_vecs, filepath.string());
}
return std::make_unique<milvus::GroupChunk>(chunks);
}

View File

@ -47,6 +47,7 @@ class ManifestGroupTranslator
* @brief Construct a translator for a column group
*
* @param segment_id ID of the segment being loaded
* @param group_chunk_type Type of the group chunk
* @param column_group_index Index of the column group within the segment
* @param chunk_reader Reader for accessing chunks from storage
* @param field_metas Metadata for all fields in this column group
@ -57,6 +58,7 @@ class ManifestGroupTranslator
*/
ManifestGroupTranslator(
int64_t segment_id,
GroupChunkType group_chunk_type,
int64_t column_group_index,
std::unique_ptr<milvus_storage::api::ChunkReader> chunk_reader,
const std::unordered_map<FieldId, FieldMeta>& field_metas,
@ -145,36 +147,36 @@ class ManifestGroupTranslator
int64_t total_size = 0;
for (auto cid : cids) {
total_size += std::max(static_cast<int64_t>(chunk_size_[cid]),
MIN_STORAGE_BYTES);
assert(cid < meta_.chunk_memory_size_.size());
total_size +=
std::max(meta_.chunk_memory_size_[cid], MIN_STORAGE_BYTES);
}
return total_size;
}
private:
/**
* @brief Load a single chunk from Arrow RecordBatch
* @brief Load a cell from multiple Arrow RecordBatches
*
* Converts an Arrow RecordBatch into a GroupChunk containing
* field data for all columns in the chunk.
* Converts multiple Arrow RecordBatches (from row groups) into a single
* GroupChunk containing merged field data for all columns.
*
* @param record_batch Arrow RecordBatch containing the chunk data
* @param record_batches Arrow RecordBatches from row groups
* @param cid Cell ID of the chunk being loaded
* @return GroupChunk containing the loaded field data
*/
std::unique_ptr<milvus::GroupChunk>
load_group_chunk(const std::shared_ptr<arrow::RecordBatch>& record_batch,
const milvus::cachinglayer::cid_t cid);
load_group_chunk(
const std::vector<std::shared_ptr<arrow::RecordBatch>>& record_batches,
milvus::cachinglayer::cid_t cid);
int64_t segment_id_;
GroupChunkType group_chunk_type_;
int64_t column_group_index_;
std::string key_;
std::unordered_map<FieldId, FieldMeta> field_metas_;
std::unique_ptr<milvus_storage::api::ChunkReader> chunk_reader_;
// chunk stats from reader
std::vector<uint64_t> chunk_size_;
GroupCTMeta meta_;
bool use_mmap_;
std::string mmap_dir_path_;