From 59bbdd93f5ac3b59869cd8c23f05a6f0379a244d Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Tue, 22 Jul 2025 22:22:53 +0800 Subject: [PATCH] fix: [StorageV2] fill the correct group chunk into cell (#43486) The root cause of the issue lies in the fact that when a sealed segment contains multiple row groups, the get_cells function may receive unordered cids. This can result in row groups being written into incorrect cells during data retrieval. Previously, this issue was hard to reproduce because the old Storage V2 writer had a bug that caused it to write row groups larger than 1MB. These large row groups could lead to uncontrolled memory usage and eventually an OOM (Out of Memory) error. Additionally, compaction typically produced a single large row group, which avoided the incorrect cell-filling issue during query execution. related: https://github.com/milvus-io/milvus/issues/43388, https://github.com/milvus-io/milvus/issues/43372, https://github.com/milvus-io/milvus/issues/43464, #43446, #43453 --------- Signed-off-by: shaoting-huang --- internal/core/src/cachinglayer/CacheSlot.h | 3 +- internal/core/src/common/ArrowDataWrapper.h | 3 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 2 +- internal/core/src/segcore/memory_planner.cpp | 2 +- .../GroupChunkTranslator.cpp | 28 +++++++++++-------- internal/core/src/storage/Util.cpp | 2 +- .../core/unittest/test_growing_storage_v2.cpp | 20 ++++++------- internal/storagev2/packed/constant.go | 2 +- 8 files changed, 33 insertions(+), 29 deletions(-) diff --git a/internal/core/src/cachinglayer/CacheSlot.h b/internal/core/src/cachinglayer/CacheSlot.h index 70f709e554..90b36f2496 100644 --- a/internal/core/src/cachinglayer/CacheSlot.h +++ b/internal/core/src/cachinglayer/CacheSlot.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -110,7 +111,7 @@ class CacheSlot final : public std::enable_shared_from_this> { [this, uids = std::vector(uids), timeout]( auto&&) -> std::shared_ptr> { auto count = std::min(uids.size(), cells_.size()); - std::unordered_set involved_cids; + ska::flat_hash_set involved_cids; involved_cids.reserve(count); switch (cell_id_mapping_mode_) { case CellIdMappingMode::IDENTICAL: { diff --git a/internal/core/src/common/ArrowDataWrapper.h b/internal/core/src/common/ArrowDataWrapper.h index 4fdc0a779c..65eabad7ff 100644 --- a/internal/core/src/common/ArrowDataWrapper.h +++ b/internal/core/src/common/ArrowDataWrapper.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include "common/Channel.h" #include "parquet/arrow/reader.h" @@ -32,7 +33,7 @@ struct ArrowDataWrapper { std::shared_ptr arrow_reader; // underlying file data memory, must outlive the arrow reader std::shared_ptr file_data; - std::vector> arrow_tables; + std::vector>> arrow_tables; }; using ArrowReaderChannel = Channel>; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index f286a0812e..f2023642e7 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -497,7 +497,7 @@ SegmentGrowingImpl::load_column_group_data_internal( std::unordered_map> field_data_map; while (column_group_info.arrow_reader_channel->pop(r)) { - for (const auto& table : r->arrow_tables) { + for (const auto& [row_group_id, table] : r->arrow_tables) { size_t batch_num_rows = table->num_rows(); for (int i = 0; i < table->schema()->num_fields(); ++i) { AssertInfo(table->schema()->field(i)->metadata()->Contains( diff --git a/internal/core/src/segcore/memory_planner.cpp b/internal/core/src/segcore/memory_planner.cpp index 93ae341a13..9723992355 100644 --- a/internal/core/src/segcore/memory_planner.cpp +++ b/internal/core/src/segcore/memory_planner.cpp @@ -207,7 +207,7 @@ LoadWithStrategy(const std::vector& remote_files, std::to_string(block.offset + i) + " from file " + file + " with error " + status.ToString()); - ret->arrow_tables.push_back(table); + ret->arrow_tables.push_back(std::make_pair(block.offset + i, table)); } auto close_status = row_group_reader->Close(); AssertInfo(close_status.ok(), diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index 146d8d63a5..a0e7426f4c 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -26,8 +26,11 @@ #include "segcore/memory_planner.h" #include +#include #include #include +#include +#include #include "arrow/type.h" #include "arrow/type_fwd.h" @@ -187,22 +190,22 @@ GroupChunkTranslator::get_cells(const std::vector& cids) { column_group_info_.field_id); std::shared_ptr r; - int64_t cid_idx = 0; - int64_t total_tables = 0; + std::unordered_set filled_cids; + filled_cids.reserve(cids.size()); while (column_group_info_.arrow_reader_channel->pop(r)) { - for (const auto& table : r->arrow_tables) { - AssertInfo(cid_idx < cids.size(), - "Number of tables exceed number of cids ({})", - cids.size()); - auto cid = cids[cid_idx++]; + for (const auto& [row_group_id, table] : r->arrow_tables) { + auto cid = static_cast(row_group_id); cells.emplace_back(cid, load_group_chunk(table, cid)); - total_tables++; + filled_cids.insert(cid); } } - AssertInfo(total_tables == cids.size(), - "Number of tables ({}) does not match number of cids ({})", - total_tables, - cids.size()); + + // Verify all requested cids have been filled + for (auto cid : cids) { + AssertInfo(filled_cids.find(cid) != filled_cids.end(), + "Cid {} was not filled, missing row group id {}", + cid, cid); + } return cells; } @@ -270,3 +273,4 @@ GroupChunkTranslator::load_group_chunk( } } // namespace milvus::segcore::storagev2translator + diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index fe61c22a11..2e2f6dace2 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1159,7 +1159,7 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, while (field_data_info.arrow_reader_channel->pop(r)) { size_t num_rows = 0; std::vector> chunked_arrays; - for (const auto& table : r->arrow_tables) { + for (const auto& [row_group_id, table] : r->arrow_tables) { num_rows += table->num_rows(); chunked_arrays.push_back(table->column(col_offset)); } diff --git a/internal/core/unittest/test_growing_storage_v2.cpp b/internal/core/unittest/test_growing_storage_v2.cpp index 06a84facd1..5c1080f318 100644 --- a/internal/core/unittest/test_growing_storage_v2.cpp +++ b/internal/core/unittest/test_growing_storage_v2.cpp @@ -227,7 +227,7 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { int64_t current_row_group = 0; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { // Verify batch size matches row group metadata EXPECT_EQ(table->num_rows(), row_group_metadata.Get(current_row_group).row_num()); @@ -257,15 +257,13 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { std::shared_ptr wrapper; int64_t total_rows = 0; - int64_t current_row_group = 0; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { // Verify batch size matches row group metadata EXPECT_EQ(table->num_rows(), - row_group_metadata.Get(current_row_group).row_num()); + row_group_metadata.Get(row_group_id).row_num()); total_rows += table->num_rows(); - current_row_group++; } } @@ -288,17 +286,17 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { row_group_lists); total_rows = 0; - current_row_group = 0; std::vector selected_row_groups = {0, 2}; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { + // row_group_id is the actual row group ID (0 or 2), not an index + // We need to find its position in selected_row_groups + auto it = std::find(selected_row_groups.begin(), selected_row_groups.end(), row_group_id); + ASSERT_NE(it, selected_row_groups.end()) << "Row group " << row_group_id << " not found in selected_row_groups"; EXPECT_EQ(table->num_rows(), - row_group_metadata - .Get(selected_row_groups[current_row_group]) - .row_num()); + row_group_metadata.Get(row_group_id).row_num()); total_rows += table->num_rows(); - current_row_group++; } } diff --git a/internal/storagev2/packed/constant.go b/internal/storagev2/packed/constant.go index a30e8f8e59..fb30281dbd 100644 --- a/internal/storagev2/packed/constant.go +++ b/internal/storagev2/packed/constant.go @@ -20,7 +20,7 @@ const ( // DefaultBufferSize is the default buffer size for writing data to storage. DefaultWriteBufferSize = 32 * 1024 * 1024 // 32MB // DefaultBufferSize is the default buffer size for reading data from storage. - DefaultReadBufferSize = -1 // use -1 for unlimited + DefaultReadBufferSize = 32 * 1024 * 1024 // 32MB // DefaultMultiPartUploadSize is the default size of each part of a multipart upload. DefaultMultiPartUploadSize = 10 * 1024 * 1024 // 10MB // Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.