enhance: [StorageV2] refactor group chunk translator (#43406)

related: #43372

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-07-21 19:46:53 +08:00 committed by GitHub
parent 81694739ef
commit 6c5f5f1e32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 64 additions and 78 deletions

View File

@ -270,19 +270,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList(
column_group_id, insert_files[0], arrow_schema, fs);
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list;
for (const auto& file : insert_files) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
row_group_meta_list.push_back(
reader->file_metadata()->GetRowGroupMetadataVector());
auto status = reader->Close();
AssertInfo(status.ok(),
"failed to close file reader when get row group "
"metadata from file: " +
file + " with error: " + status.ToString());
}
// if multiple fields share same column group
// hint for not loading certain field shall not be working for now
// warmup will be disabled only when all columns are not in load list
@ -316,7 +303,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
column_group_info,
insert_files,
info.enable_mmap,
row_group_meta_list,
milvus_field_ids.size(),
load_info.load_priority);

View File

@ -104,54 +104,42 @@ ParallelDegreeSplitStrategy::split(
return blocks;
}
// If row group size is less than parallel degree, split non-continuous groups
if (sorted_row_groups.size() <= actual_parallel_degree) {
// Helper function to create continuous blocks
auto create_continuous_blocks = [&](size_t max_block_size = SIZE_MAX) {
std::vector<RowGroupBlock> continuous_blocks;
int64_t current_start = sorted_row_groups[0];
int64_t current_count = 1;
for (size_t i = 1; i < sorted_row_groups.size(); ++i) {
int64_t next_row_group = sorted_row_groups[i];
if (next_row_group == current_start + current_count) {
if (next_row_group == current_start + current_count &&
current_count < max_block_size) {
current_count++;
continue;
}
blocks.push_back({current_start, current_count});
continuous_blocks.push_back({current_start, current_count});
current_start = next_row_group;
current_count = 1;
}
if (current_count > 0) {
blocks.push_back({current_start, current_count});
continuous_blocks.push_back({current_start, current_count});
}
return blocks;
return continuous_blocks;
};
// If row group size is less than parallel degree, split non-continuous groups
if (sorted_row_groups.size() <= actual_parallel_degree) {
return create_continuous_blocks();
}
// Otherwise, split based on parallel degree
size_t avg_block_size =
(sorted_row_groups.size() + actual_parallel_degree - 1) /
actual_parallel_degree;
int64_t current_start = sorted_row_groups[0];
int64_t current_count = 1;
for (size_t i = 1; i < sorted_row_groups.size(); ++i) {
int64_t next_row_group = sorted_row_groups[i];
if (next_row_group == current_start + current_count &&
current_count < avg_block_size) {
current_count++;
} else {
blocks.push_back({current_start, current_count});
current_start = next_row_group;
current_count = 1;
}
}
if (current_count > 0) {
blocks.push_back({current_start, current_count});
}
return blocks;
return create_continuous_blocks(avg_block_size);
}
void
@ -182,31 +170,33 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
// Use provided strategy to split row groups
auto blocks = strategy->split(row_groups);
LOG_INFO("split row groups into blocks: {} for file {}",
blocks.size(), file);
// Create and submit tasks for each block
std::vector<std::future<std::shared_ptr<milvus::ArrowDataWrapper>>>
futures;
futures.reserve(blocks.size());
// split memory limit for each block, check if it's greater than 0
auto reader_memory_limit = memory_limit / blocks.size();
if (reader_memory_limit < FILE_SLICE_SIZE) {
reader_memory_limit = FILE_SLICE_SIZE;
}
auto reader_memory_limit = std::max<int64_t>(
memory_limit / blocks.size(), FILE_SLICE_SIZE);
for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block,
fs,
file,
schema,
memory_limit]() {
reader_memory_limit]() {
AssertInfo(fs != nullptr, "file system is nullptr");
auto row_group_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
fs, file, schema, memory_limit);
fs, file, schema, reader_memory_limit);
AssertInfo(row_group_reader != nullptr,
"row group reader is nullptr");
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
block.count);
LOG_INFO("read row groups from file {} with offset {} and count {}",
file, block.offset, block.count);
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;

View File

@ -21,6 +21,7 @@
#include "milvus-storage/common/metadata.h"
#include "milvus-storage/filesystem/fs.h"
#include "milvus-storage/common/constants.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "storage/ThreadPools.h"
#include "segcore/memory_planner.h"
@ -42,8 +43,6 @@ GroupChunkTranslator::GroupChunkTranslator(
FieldDataInfo column_group_info,
std::vector<std::string> insert_files,
bool use_mmap,
const std::vector<milvus_storage::RowGroupMetadataVector>&
row_group_meta_list,
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id),
@ -52,7 +51,6 @@ GroupChunkTranslator::GroupChunkTranslator(
column_group_info_(column_group_info),
insert_files_(insert_files),
use_mmap_(use_mmap),
row_group_meta_list_(row_group_meta_list),
load_priority_(load_priority),
meta_(
num_fields,
@ -64,8 +62,22 @@ GroupChunkTranslator::GroupChunkTranslator(
milvus::segcore::getCacheWarmupPolicy(/* is_vector */ false,
/* is_index */ false),
/* support_eviction */ true) {
AssertInfo(insert_files_.size() == row_group_meta_list_.size(),
"Number of insert files must match number of row group metas");
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
// Get row group metadata from files
for (const auto& file : insert_files_) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
row_group_meta_list_.push_back(
reader->file_metadata()->GetRowGroupMetadataVector());
auto status = reader->Close();
AssertInfo(status.ok(),
"failed to close file reader when get row group "
"metadata from file: " +
file + " with error: " + status.ToString());
}
meta_.num_rows_until_chunk_.push_back(0);
for (const auto& row_group_meta : row_group_meta_list_) {
for (int i = 0; i < row_group_meta.size(); ++i) {
@ -126,7 +138,17 @@ GroupChunkTranslator::get_file_and_row_group_index(
remaining_cid -= file_metas.size();
}
return {0, 0}; // Default to first file and first row group if not found
// cid is out of range, this should not happen
AssertInfo(false,
fmt::format("cid {} is out of range. Total row groups across all files: {}",
cid,
[this]() {
size_t total = 0;
for (const auto& file_metas : row_group_meta_list_) {
total += file_metas.size();
}
return total;
}()));
}
std::vector<std::pair<cachinglayer::cid_t, std::unique_ptr<milvus::GroupChunk>>>
@ -137,11 +159,7 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
cells.reserve(cids.size());
// Create row group lists for requested cids
std::vector<std::vector<int64_t>> row_group_lists;
row_group_lists.reserve(insert_files_.size());
for (size_t i = 0; i < insert_files_.size(); ++i) {
row_group_lists.emplace_back();
}
std::vector<std::vector<int64_t>> row_group_lists(insert_files_.size());
for (auto cid : cids) {
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);

View File

@ -41,8 +41,6 @@ class GroupChunkTranslator
FieldDataInfo column_group_info,
std::vector<std::string> insert_files,
bool use_mmap,
const std::vector<milvus_storage::RowGroupMetadataVector>&
row_group_meta_list,
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority);

View File

@ -100,30 +100,22 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
auto use_mmap = GetParam();
std::unordered_map<FieldId, FieldMeta> field_metas = schema_->get_fields();
auto column_group_info = FieldDataInfo(0, 3000, "");
// Get row group metadata
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list;
auto fr =
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
row_group_meta_list.push_back(
fr->file_metadata()->GetRowGroupMetadataVector());
auto status = fr->Close();
AssertInfo(
status.ok(),
"failed to close file reader when get row group metadata from file: " +
paths_[0] + " with error: " + status.ToString());
auto translator = std::make_unique<GroupChunkTranslator>(segment_id_,
field_metas,
column_group_info,
paths_,
use_mmap,
row_group_meta_list,
schema_->get_field_ids().size(),
milvus::proto::common::LoadPriority::LOW);
// num cells
EXPECT_EQ(translator->num_cells(), row_group_meta_list[0].size());
// num cells - get the expected number from the file directly
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
auto expected_num_cells = fr->file_metadata()->GetRowGroupMetadataVector().size();
auto row_group_metadata_vector = fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close();
AssertInfo(status.ok(), "failed to close file reader");
EXPECT_EQ(translator->num_cells(), expected_num_cells);
// cell id of
for (size_t i = 0; i < translator->num_cells(); ++i) {
@ -133,14 +125,16 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
// key
EXPECT_EQ(translator->key(), "seg_0_cg_0");
// estimated byte size
for (size_t i = 0; i < translator->num_cells(); ++i) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_index(i);
auto& row_group_meta = row_group_meta_list[file_idx].Get(row_group_idx);
// Get the expected memory size from the file directly
auto expected_memory_size = static_cast<int64_t>(row_group_metadata_vector.Get(row_group_idx).memory_size());
auto usage = translator->estimated_byte_size_of_cell(i);
EXPECT_EQ(usage.memory_bytes,
static_cast<int64_t>(row_group_meta.memory_size()));
EXPECT_EQ(usage.memory_bytes, expected_memory_size);
}
// getting cells