fix: storage v2 write mmap file per field per cell (#42180)

Each cell of a field should be written to its own mmap file, rather than
writing all cells of the field into a single mmap file.
related: #39173

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-06-09 11:48:33 +08:00 committed by GitHub
parent 6e16653597
commit b136f85ca0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 99 deletions

View File

@ -82,12 +82,6 @@ GroupChunkTranslator::GroupChunkTranslator(
} }
GroupChunkTranslator::~GroupChunkTranslator() { GroupChunkTranslator::~GroupChunkTranslator() {
for (auto chunk : group_chunks_) {
if (chunk != nullptr) {
// let the GroupChunk to be deleted by the unique_ptr
auto chunk_ptr = std::unique_ptr<GroupChunk>(chunk);
}
}
} }
size_t size_t
@ -166,77 +160,31 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
LOG_INFO("segment {} submits load fields {} task to thread pool", LOG_INFO("segment {} submits load fields {} task to thread pool",
segment_id_, segment_id_,
field_id_list_.ToString()); field_id_list_.ToString());
if (!use_mmap_) {
load_column_group_in_memory();
} else {
load_column_group_in_mmap();
}
for (auto cid : cids) { std::shared_ptr<milvus::ArrowDataWrapper> r;
AssertInfo(group_chunks_[cid] != nullptr, int64_t cid_idx = 0;
"GroupChunkTranslator::get_cells failed to load cell {} of " int64_t total_tables = 0;
"CacheSlot {}.", while (column_group_info_.arrow_reader_channel->pop(r)) {
cid, for (const auto& table : r->arrow_tables) {
key_); AssertInfo(cid_idx < cids.size(),
cells.emplace_back( "Number of tables exceed number of cids ({})",
cid, std::unique_ptr<milvus::GroupChunk>(group_chunks_[cid])); cids.size());
group_chunks_[cid] = nullptr; auto cid = cids[cid_idx++];
cells.emplace_back(cid, load_group_chunk(table, cid));
total_tables++;
}
} }
AssertInfo(total_tables == cids.size(),
"Number of tables ({}) does not match number of cids ({})",
total_tables,
cids.size());
return cells; return cells;
} }
void std::unique_ptr<milvus::GroupChunk>
GroupChunkTranslator::load_column_group_in_memory() { GroupChunkTranslator::load_group_chunk(
std::vector<size_t> row_counts(field_id_list_.size(), 0); const std::shared_ptr<arrow::Table>& table,
std::shared_ptr<milvus::ArrowDataWrapper> r; const milvus::cachinglayer::cid_t cid) {
std::vector<std::string> files;
std::vector<size_t> file_offsets;
while (column_group_info_.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
process_batch(table, files, file_offsets, row_counts);
}
}
}
void
GroupChunkTranslator::load_column_group_in_mmap() {
std::vector<std::string> files;
std::vector<size_t> file_offsets;
std::vector<size_t> row_counts;
// Initialize files and offsets
for (size_t i = 0; i < field_id_list_.size(); ++i) {
auto field_id = field_id_list_.Get(i);
auto filepath =
std::filesystem::path(column_group_info_.mmap_dir_path) /
std::to_string(segment_id_) / std::to_string(field_id);
auto dir = filepath.parent_path();
std::filesystem::create_directories(dir);
files.push_back(filepath.string());
file_offsets.push_back(0);
row_counts.push_back(0);
}
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (column_group_info_.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
process_batch(table, files, file_offsets, row_counts);
}
}
for (size_t i = 0; i < files.size(); ++i) {
auto ok = unlink(files[i].c_str());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap data file {}, err: {}",
files[i].c_str(),
strerror(errno)));
}
}
void
GroupChunkTranslator::process_batch(const std::shared_ptr<arrow::Table>& table,
const std::vector<std::string>& files,
std::vector<size_t>& file_offsets,
std::vector<size_t>& row_counts) {
// Create chunks for each field in this batch // Create chunks for each field in this batch
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks; std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
// Iterate through field_id_list to get field_id and create chunk // Iterate through field_id_list to get field_id and create chunk
@ -263,25 +211,35 @@ GroupChunkTranslator::process_batch(const std::shared_ptr<arrow::Table>& table,
chunk = create_chunk(field_meta, dim, array_vec); chunk = create_chunk(field_meta, dim, array_vec);
} else { } else {
// Mmap mode // Mmap mode
int flags = O_RDWR; auto filepath =
if (file_offsets[i] == 0) { std::filesystem::path(column_group_info_.mmap_dir_path) /
// First write to this file, create and truncate std::to_string(segment_id_) / std::to_string(field_id) /
flags |= O_CREAT | O_TRUNC; std::to_string(cid);
}
auto file = File::Open(files[i], flags); LOG_INFO(
// should seek to the file offset before writing "storage v2 segment {} mmaping field {} chunk {} to path {}",
file.Seek(file_offsets[i], SEEK_SET); segment_id_,
chunk = field_id,
create_chunk(field_meta, dim, file, file_offsets[i], array_vec); cid,
file_offsets[i] += chunk->Size(); filepath.string());
std::filesystem::create_directories(filepath.parent_path());
auto file =
File::Open(filepath.string(), O_CREAT | O_TRUNC | O_RDWR);
auto chunk = create_chunk(field_meta, dim, file, 0, array_vec);
auto ok = unlink(filepath.c_str());
AssertInfo(
ok == 0,
fmt::format(
"storage v2 failed to unlink mmap data file {}, err: {}",
filepath.c_str(),
strerror(errno)));
} }
row_counts[i] += chunk->RowNums();
chunks[fid] = std::move(chunk); chunks[fid] = std::move(chunk);
} }
// Create GroupChunk from chunks and store in results return std::make_unique<milvus::GroupChunk>(chunks);
auto group_chunk = std::make_unique<milvus::GroupChunk>(chunks);
group_chunks_.emplace_back(group_chunk.release());
} }
} // namespace milvus::segcore::storagev2translator } // namespace milvus::segcore::storagev2translator

View File

@ -72,17 +72,9 @@ class GroupChunkTranslator
} }
private: private:
void std::unique_ptr<milvus::GroupChunk>
load_column_group_in_memory(); load_group_chunk(const std::shared_ptr<arrow::Table>& table,
const milvus::cachinglayer::cid_t cid);
void
load_column_group_in_mmap();
void
process_batch(const std::shared_ptr<arrow::Table>& table,
const std::vector<std::string>& files,
std::vector<size_t>& file_offsets,
std::vector<size_t>& row_counts);
int64_t segment_id_; int64_t segment_id_;
std::string key_; std::string key_;
@ -96,7 +88,6 @@ class GroupChunkTranslator
ChunkedSegmentSealedImpl* chunked_segment_; ChunkedSegmentSealedImpl* chunked_segment_;
std::unique_ptr<milvus::segcore::InsertRecord<true>> ir_; std::unique_ptr<milvus::segcore::InsertRecord<true>> ir_;
GroupCTMeta meta_; GroupCTMeta meta_;
std::vector<milvus::GroupChunk*> group_chunks_;
int64_t timestamp_offet_; int64_t timestamp_offet_;
bool use_mmap_; bool use_mmap_;
}; };