diff --git a/internal/core/src/cachinglayer/CacheSlot.h b/internal/core/src/cachinglayer/CacheSlot.h index 70f709e554..90b36f2496 100644 --- a/internal/core/src/cachinglayer/CacheSlot.h +++ b/internal/core/src/cachinglayer/CacheSlot.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -110,7 +111,7 @@ class CacheSlot final : public std::enable_shared_from_this> { [this, uids = std::vector(uids), timeout]( auto&&) -> std::shared_ptr> { auto count = std::min(uids.size(), cells_.size()); - std::unordered_set involved_cids; + ska::flat_hash_set involved_cids; involved_cids.reserve(count); switch (cell_id_mapping_mode_) { case CellIdMappingMode::IDENTICAL: { diff --git a/internal/core/src/common/ArrowDataWrapper.h b/internal/core/src/common/ArrowDataWrapper.h index 4fdc0a779c..65eabad7ff 100644 --- a/internal/core/src/common/ArrowDataWrapper.h +++ b/internal/core/src/common/ArrowDataWrapper.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include "common/Channel.h" #include "parquet/arrow/reader.h" @@ -32,7 +33,7 @@ struct ArrowDataWrapper { std::shared_ptr arrow_reader; // underlying file data memory, must outlive the arrow reader std::shared_ptr file_data; - std::vector> arrow_tables; + std::vector>> arrow_tables; }; using ArrowReaderChannel = Channel>; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index f286a0812e..f2023642e7 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -497,7 +497,7 @@ SegmentGrowingImpl::load_column_group_data_internal( std::unordered_map> field_data_map; while (column_group_info.arrow_reader_channel->pop(r)) { - for (const auto& table : r->arrow_tables) { + for (const auto& [row_group_id, table] : r->arrow_tables) { size_t batch_num_rows = table->num_rows(); for (int i = 0; i < table->schema()->num_fields(); ++i) { AssertInfo(table->schema()->field(i)->metadata()->Contains( diff --git a/internal/core/src/segcore/memory_planner.cpp b/internal/core/src/segcore/memory_planner.cpp index 93ae341a13..9723992355 100644 --- a/internal/core/src/segcore/memory_planner.cpp +++ b/internal/core/src/segcore/memory_planner.cpp @@ -207,7 +207,7 @@ LoadWithStrategy(const std::vector& remote_files, std::to_string(block.offset + i) + " from file " + file + " with error " + status.ToString()); - ret->arrow_tables.push_back(table); + ret->arrow_tables.push_back(std::make_pair(block.offset + i, table)); } auto close_status = row_group_reader->Close(); AssertInfo(close_status.ok(), diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index 146d8d63a5..a0e7426f4c 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -26,8 +26,11 @@ #include "segcore/memory_planner.h" #include +#include #include #include +#include +#include #include "arrow/type.h" #include "arrow/type_fwd.h" @@ -187,22 +190,22 @@ GroupChunkTranslator::get_cells(const std::vector& cids) { column_group_info_.field_id); std::shared_ptr r; - int64_t cid_idx = 0; - int64_t total_tables = 0; + std::unordered_set filled_cids; + filled_cids.reserve(cids.size()); while (column_group_info_.arrow_reader_channel->pop(r)) { - for (const auto& table : r->arrow_tables) { - AssertInfo(cid_idx < cids.size(), - "Number of tables exceed number of cids ({})", - cids.size()); - auto cid = cids[cid_idx++]; + for (const auto& [row_group_id, table] : r->arrow_tables) { + auto cid = static_cast(row_group_id); cells.emplace_back(cid, load_group_chunk(table, cid)); - total_tables++; + filled_cids.insert(cid); } } - AssertInfo(total_tables == cids.size(), - "Number of tables ({}) does not match number of cids ({})", - total_tables, - cids.size()); + + // Verify all requested cids have been filled + for (auto cid : cids) { + AssertInfo(filled_cids.find(cid) != filled_cids.end(), + "Cid {} was not filled, missing row group id {}", + cid, cid); + } return cells; } @@ -270,3 +273,4 @@ GroupChunkTranslator::load_group_chunk( } } // namespace milvus::segcore::storagev2translator + diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index fe61c22a11..2e2f6dace2 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1159,7 +1159,7 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, while (field_data_info.arrow_reader_channel->pop(r)) { size_t num_rows = 0; std::vector> chunked_arrays; - for (const auto& table : r->arrow_tables) { + for (const auto& [row_group_id, table] : r->arrow_tables) { num_rows += table->num_rows(); chunked_arrays.push_back(table->column(col_offset)); } diff --git a/internal/core/unittest/test_growing_storage_v2.cpp b/internal/core/unittest/test_growing_storage_v2.cpp index 06a84facd1..5c1080f318 100644 --- a/internal/core/unittest/test_growing_storage_v2.cpp +++ b/internal/core/unittest/test_growing_storage_v2.cpp @@ -227,7 +227,7 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { int64_t current_row_group = 0; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { // Verify batch size matches row group metadata EXPECT_EQ(table->num_rows(), row_group_metadata.Get(current_row_group).row_num()); @@ -257,15 +257,13 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { std::shared_ptr wrapper; int64_t total_rows = 0; - int64_t current_row_group = 0; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { // Verify batch size matches row group metadata EXPECT_EQ(table->num_rows(), - row_group_metadata.Get(current_row_group).row_num()); + row_group_metadata.Get(row_group_id).row_num()); total_rows += table->num_rows(); - current_row_group++; } } @@ -288,17 +286,17 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { row_group_lists); total_rows = 0; - current_row_group = 0; std::vector selected_row_groups = {0, 2}; while (channel->pop(wrapper)) { - for (const auto& table : wrapper->arrow_tables) { + for (const auto& [row_group_id, table] : wrapper->arrow_tables) { + // row_group_id is the actual row group ID (0 or 2), not an index + // We need to find its position in selected_row_groups + auto it = std::find(selected_row_groups.begin(), selected_row_groups.end(), row_group_id); + ASSERT_NE(it, selected_row_groups.end()) << "Row group " << row_group_id << " not found in selected_row_groups"; EXPECT_EQ(table->num_rows(), - row_group_metadata - .Get(selected_row_groups[current_row_group]) - .row_num()); + row_group_metadata.Get(row_group_id).row_num()); total_rows += table->num_rows(); - current_row_group++; } } diff --git a/internal/storagev2/packed/constant.go b/internal/storagev2/packed/constant.go index a30e8f8e59..fb30281dbd 100644 --- a/internal/storagev2/packed/constant.go +++ b/internal/storagev2/packed/constant.go @@ -20,7 +20,7 @@ const ( // DefaultBufferSize is the default buffer size for writing data to storage. DefaultWriteBufferSize = 32 * 1024 * 1024 // 32MB // DefaultBufferSize is the default buffer size for reading data from storage. - DefaultReadBufferSize = -1 // use -1 for unlimited + DefaultReadBufferSize = 32 * 1024 * 1024 // 32MB // DefaultMultiPartUploadSize is the default size of each part of a multipart upload. DefaultMultiPartUploadSize = 10 * 1024 * 1024 // 10MB // Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.