fix: [StorageV2] fill the correct group chunk into cell (#43486)

The root cause of the issue lies in the fact that when a sealed segment
contains multiple row groups, the get_cells function may receive
unordered cids. This can result in row groups being written into
incorrect cells during data retrieval.

Previously, this issue was hard to reproduce because the old Storage V2
writer had a bug that caused it to write row groups larger than 1MB.
These large row groups could lead to uncontrolled memory usage and
eventually an OOM (Out of Memory) error. Additionally, compaction
typically produced a single large row group, which avoided the incorrect
cell-filling issue during query execution.

related: https://github.com/milvus-io/milvus/issues/43388,
https://github.com/milvus-io/milvus/issues/43372,
https://github.com/milvus-io/milvus/issues/43464, #43446, #43453

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-07-22 22:22:53 +08:00 committed by GitHub
parent 92f4fc0e8b
commit 59bbdd93f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 33 additions and 29 deletions

View File

@ -20,6 +20,7 @@
#include <utility>
#include <vector>
#include <flat_hash_map/flat_hash_map.hpp>
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
#include <folly/Synchronized.h>
@ -110,7 +111,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
[this, uids = std::vector<uid_t>(uids), timeout](
auto&&) -> std::shared_ptr<CellAccessor<CellT>> {
auto count = std::min(uids.size(), cells_.size());
std::unordered_set<cid_t> involved_cids;
ska::flat_hash_set<cid_t> involved_cids;
involved_cids.reserve(count);
switch (cell_id_mapping_mode_) {
case CellIdMappingMode::IDENTICAL: {

View File

@ -12,6 +12,7 @@
#pragma once
#include <memory>
#include <utility>
#include "common/Channel.h"
#include "parquet/arrow/reader.h"
@ -32,7 +33,7 @@ struct ArrowDataWrapper {
std::shared_ptr<parquet::arrow::FileReader> arrow_reader;
// underlying file data memory, must outlive the arrow reader
std::shared_ptr<uint8_t[]> file_data;
std::vector<std::shared_ptr<arrow::Table>> arrow_tables;
std::vector<std::pair<int64_t, std::shared_ptr<arrow::Table>>> arrow_tables;
};
using ArrowReaderChannel = Channel<std::shared_ptr<milvus::ArrowDataWrapper>>;

View File

@ -497,7 +497,7 @@ SegmentGrowingImpl::load_column_group_data_internal(
std::unordered_map<FieldId, std::vector<FieldDataPtr>> field_data_map;
while (column_group_info.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
for (const auto& [row_group_id, table] : r->arrow_tables) {
size_t batch_num_rows = table->num_rows();
for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains(

View File

@ -207,7 +207,7 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
std::to_string(block.offset + i) +
" from file " + file + " with error " +
status.ToString());
ret->arrow_tables.push_back(table);
ret->arrow_tables.push_back(std::make_pair(block.offset + i, table));
}
auto close_status = row_group_reader->Close();
AssertInfo(close_status.ok(),

View File

@ -26,8 +26,11 @@
#include "segcore/memory_planner.h"
#include <string>
#include <unordered_set>
#include <vector>
#include <unordered_map>
#include <set>
#include <algorithm>
#include "arrow/type.h"
#include "arrow/type_fwd.h"
@ -187,22 +190,22 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
column_group_info_.field_id);
std::shared_ptr<milvus::ArrowDataWrapper> r;
int64_t cid_idx = 0;
int64_t total_tables = 0;
std::unordered_set<cachinglayer::cid_t> filled_cids;
filled_cids.reserve(cids.size());
while (column_group_info_.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
AssertInfo(cid_idx < cids.size(),
"Number of tables exceed number of cids ({})",
cids.size());
auto cid = cids[cid_idx++];
for (const auto& [row_group_id, table] : r->arrow_tables) {
auto cid = static_cast<cachinglayer::cid_t>(row_group_id);
cells.emplace_back(cid, load_group_chunk(table, cid));
total_tables++;
filled_cids.insert(cid);
}
}
AssertInfo(total_tables == cids.size(),
"Number of tables ({}) does not match number of cids ({})",
total_tables,
cids.size());
// Verify all requested cids have been filled
for (auto cid : cids) {
AssertInfo(filled_cids.find(cid) != filled_cids.end(),
"Cid {} was not filled, missing row group id {}",
cid, cid);
}
return cells;
}
@ -270,3 +273,4 @@ GroupChunkTranslator::load_group_chunk(
}
} // namespace milvus::segcore::storagev2translator

View File

@ -1159,7 +1159,7 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
while (field_data_info.arrow_reader_channel->pop(r)) {
size_t num_rows = 0;
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunked_arrays;
for (const auto& table : r->arrow_tables) {
for (const auto& [row_group_id, table] : r->arrow_tables) {
num_rows += table->num_rows();
chunked_arrays.push_back(table->column(col_offset));
}

View File

@ -227,7 +227,7 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
int64_t current_row_group = 0;
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
for (const auto& [row_group_id, table] : wrapper->arrow_tables) {
// Verify batch size matches row group metadata
EXPECT_EQ(table->num_rows(),
row_group_metadata.Get(current_row_group).row_num());
@ -257,15 +257,13 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
std::shared_ptr<milvus::ArrowDataWrapper> wrapper;
int64_t total_rows = 0;
int64_t current_row_group = 0;
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
for (const auto& [row_group_id, table] : wrapper->arrow_tables) {
// Verify batch size matches row group metadata
EXPECT_EQ(table->num_rows(),
row_group_metadata.Get(current_row_group).row_num());
row_group_metadata.Get(row_group_id).row_num());
total_rows += table->num_rows();
current_row_group++;
}
}
@ -288,17 +286,17 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
row_group_lists);
total_rows = 0;
current_row_group = 0;
std::vector<int64_t> selected_row_groups = {0, 2};
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
for (const auto& [row_group_id, table] : wrapper->arrow_tables) {
// row_group_id is the actual row group ID (0 or 2), not an index
// We need to find its position in selected_row_groups
auto it = std::find(selected_row_groups.begin(), selected_row_groups.end(), row_group_id);
ASSERT_NE(it, selected_row_groups.end()) << "Row group " << row_group_id << " not found in selected_row_groups";
EXPECT_EQ(table->num_rows(),
row_group_metadata
.Get(selected_row_groups[current_row_group])
.row_num());
row_group_metadata.Get(row_group_id).row_num());
total_rows += table->num_rows();
current_row_group++;
}
}

View File

@ -20,7 +20,7 @@ const (
// DefaultBufferSize is the default buffer size for writing data to storage.
DefaultWriteBufferSize = 32 * 1024 * 1024 // 32MB
// DefaultBufferSize is the default buffer size for reading data from storage.
DefaultReadBufferSize = -1 // use -1 for unlimited
DefaultReadBufferSize = 32 * 1024 * 1024 // 32MB
// DefaultMultiPartUploadSize is the default size of each part of a multipart upload.
DefaultMultiPartUploadSize = 10 * 1024 * 1024 // 10MB
// Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.