diff --git a/internal/core/conanfile.py b/internal/core/conanfile.py index 3617cd8eb6..275b987470 100644 --- a/internal/core/conanfile.py +++ b/internal/core/conanfile.py @@ -46,6 +46,7 @@ class MilvusConan(ConanFile): "mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01", "geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37", "icu/74.2#cd1937b9561b8950a2ae6311284c5813", + "libavrocpp/1.12.1@milvus/dev", ) generators = ("cmake", "cmake_find_package") diff --git a/internal/core/src/common/VectorArrayStorageV2Test.cpp b/internal/core/src/common/VectorArrayStorageV2Test.cpp index 899bc3b7f6..e62c1a4a65 100644 --- a/internal/core/src/common/VectorArrayStorageV2Test.cpp +++ b/internal/core/src/common/VectorArrayStorageV2Test.cpp @@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test { auto storage_config = milvus_storage::StorageConfig(); // Create writer - milvus_storage::PackedRecordBatchWriter writer( + auto result = milvus_storage::PackedRecordBatchWriter::Make( fs, paths, schema_->ConvertToArrowSchema(), storage_config, column_groups, - writer_memory); + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); // Generate and write data int64_t row_count = 0; @@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test { auto record_batch = arrow::RecordBatch::Make( schema_->ConvertToArrowSchema(), test_data_count_, arrays); row_count += test_data_count_; - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); LoadFieldDataInfo load_info; load_info.field_infos.emplace( diff --git a/internal/core/src/index/json_stats/JsonKeyStats.cpp b/internal/core/src/index/json_stats/JsonKeyStats.cpp index 57d57370ad..392b6d2be6 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.cpp +++ b/internal/core/src/index/json_stats/JsonKeyStats.cpp @@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id, const std::string& file) { auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto file_reader = - std::make_shared(fs, file); + auto result = milvus_storage::FileRowGroupReader::Make(fs, file); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto file_reader = result.ValueOrDie(); std::shared_ptr file_schema = file_reader->schema(); LOG_DEBUG("get column schema: [{}] for segment {}", file_schema->ToString(true), @@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) { auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto file_reader = - std::make_shared(fs, file); - + auto result = milvus_storage::FileRowGroupReader::Make(fs, file); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto file_reader = result.ValueOrDie(); // get key value metadata from parquet file std::shared_ptr metadata = file_reader->file_metadata(); @@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id, auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); - auto file_reader = - std::make_shared(fs, files[0]); + auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto file_reader = result.ValueOrDie(); std::shared_ptr metadata = file_reader->file_metadata(); milvus_storage::FieldIDList field_id_list = @@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id, } for (const auto& file : files) { - auto reader = - std::make_shared(fs, file); + auto result = milvus_storage::FileRowGroupReader::Make(fs, file); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto reader = result.ValueOrDie(); auto row_group_meta_vector = reader->file_metadata()->GetRowGroupMetadataVector(); num_rows += row_group_meta_vector.row_num(); diff --git a/internal/core/src/index/json_stats/parquet_writer.cpp b/internal/core/src/index/json_stats/parquet_writer.cpp index da033d6f58..e2b2ad4bcd 100644 --- a/internal/core/src/index/json_stats/parquet_writer.cpp +++ b/internal/core/src/index/json_stats/parquet_writer.cpp @@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) { schema_ = context.schema; builders_ = context.builders; builders_map_ = context.builders_map; - kv_metadata_ = std::move(context.kv_metadata); + kv_metadata_ = context.kv_metadata; column_groups_ = context.column_groups; file_paths_ = context.file_paths; - packed_writer_ = std::make_unique( - fs_, - file_paths_, - schema_, - storage_config_, - column_groups_, - buffer_size_); + auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_, + file_paths_, + schema_, + storage_config_, + column_groups_, + buffer_size_); + AssertInfo(result.ok(), + "[StorageV2] Failed to create packed writer: " + + result.status().ToString()); + packed_writer_ = result.ValueOrDie(); for (const auto& [key, value] : kv_metadata_) { packed_writer_->AddUserMetadata(key, value); } diff --git a/internal/core/src/index/json_stats/parquet_writer.h b/internal/core/src/index/json_stats/parquet_writer.h index d45bde7381..62ec6d2c84 100644 --- a/internal/core/src/index/json_stats/parquet_writer.h +++ b/internal/core/src/index/json_stats/parquet_writer.h @@ -232,7 +232,7 @@ class JsonStatsParquetWriter { size_t batch_size_; std::shared_ptr fs_; milvus_storage::StorageConfig storage_config_; - std::unique_ptr packed_writer_; + std::shared_ptr packed_writer_; std::vector> kv_metadata_; // cache for builders diff --git a/internal/core/src/segcore/ChunkedSegmentSealedStorageV2Test.cpp b/internal/core/src/segcore/ChunkedSegmentSealedStorageV2Test.cpp index 9e21785bf6..5f39f76d04 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedStorageV2Test.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedStorageV2Test.cpp @@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { auto storage_config = milvus_storage::StorageConfig(); // Create writer - milvus_storage::PackedRecordBatchWriter writer( + auto result = milvus_storage::PackedRecordBatchWriter::Make( fs, paths, schema->ConvertToArrowSchema(), storage_config, column_groups, - writer_memory); + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); // Generate and write data int64_t row_count = 0; @@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { auto record_batch = arrow::RecordBatch::Make( schema->ConvertToArrowSchema(), test_data_count, arrays); row_count += test_data_count; - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); LoadFieldDataInfo load_info; load_info.field_infos.emplace( diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 4160bd8934..ce2f285eaf 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal( std::vector> row_group_lists; row_group_lists.reserve(insert_files.size()); for (const auto& file : insert_files) { - auto reader = std::make_shared( + auto result = milvus_storage::FileRowGroupReader::Make( fs, file, milvus_storage::DEFAULT_READ_BUFFER_SIZE, storage::GetReaderProperties()); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto reader = result.ValueOrDie(); auto row_group_num = reader->file_metadata()->GetRowGroupMetadataVector().size(); std::vector all_row_groups(row_group_num); diff --git a/internal/core/src/segcore/SegmentGrowingStorageV2Test.cpp b/internal/core/src/segcore/SegmentGrowingStorageV2Test.cpp index a0983c7d17..06b629e490 100644 --- a/internal/core/src/segcore/SegmentGrowingStorageV2Test.cpp +++ b/internal/core/src/segcore/SegmentGrowingStorageV2Test.cpp @@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) { auto column_groups = std::vector>{{2}, {0, 1}}; auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); - milvus_storage::PackedRecordBatchWriter writer( - fs_, paths, schema_, storage_config, column_groups, writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + paths, + schema_, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); for (int i = 0; i < batch_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); + EXPECT_TRUE(writer->Write(record_batch_).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); auto schema = std::make_shared(); auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true); @@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) { auto column_groups = std::vector>{{2}, {0, 1}}; auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); - milvus_storage::PackedRecordBatchWriter writer( - fs_, paths, schema_, storage_config, column_groups, writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + paths, + schema_, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); for (int i = 0; i < batch_size; ++i) { - EXPECT_TRUE(writer.Write(record_batch_).ok()); + EXPECT_TRUE(writer->Write(record_batch_).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); auto channel = std::make_shared(); int64_t memory_limit = 1024 * 1024 * 1024; // 1GB uint64_t parallel_degree = 2; // read all row groups - auto fr = std::make_shared( - fs_, paths[0], schema_); + auto reader_result = + milvus_storage::FileRowGroupReader::Make(fs_, paths[0]); + AssertInfo(reader_result.ok(), + "[StorageV2] Failed to create file row group reader: " + + reader_result.status().ToString()); + auto fr = reader_result.ValueOrDie(); auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector(); auto status = fr->Close(); AssertInfo( @@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) { auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); auto arrow_schema = schema->ConvertToArrowSchema(); - milvus_storage::PackedRecordBatchWriter writer( - fs_, paths, arrow_schema, storage_config, column_groups, writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + paths, + arrow_schema, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); int64_t total_rows = 0; for (int64_t i = 0; i < n_batch; i++) { auto dataset = DataGen(schema, per_batch); @@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) { ConvertToArrowRecordBatch(dataset, dim, arrow_schema); total_rows += record_batch->num_rows(); - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); // Load data back from storage v2 LoadFieldDataInfo load_info; diff --git a/internal/core/src/segcore/memory_planner.cpp b/internal/core/src/segcore/memory_planner.cpp index 717fff6edf..e1fedd7c5d 100644 --- a/internal/core/src/segcore/memory_planner.cpp +++ b/internal/core/src/segcore/memory_planner.cpp @@ -183,55 +183,54 @@ LoadWithStrategy(const std::vector& remote_files, memory_limit / blocks.size(), FILE_SLICE_SIZE.load()); for (const auto& block : blocks) { - futures.emplace_back(pool.Submit([block, - fs, - file, - file_idx, - schema, - reader_memory_limit]() { - AssertInfo(fs != nullptr, - "[StorageV2] file system is nullptr"); - auto row_group_reader = - std::make_shared( + futures.emplace_back(pool.Submit( + [block, fs, file, file_idx, schema, reader_memory_limit]() { + AssertInfo(fs != nullptr, + "[StorageV2] file system is nullptr"); + auto result = milvus_storage::FileRowGroupReader::Make( fs, file, schema, reader_memory_limit, milvus::storage::GetReaderProperties()); - AssertInfo(row_group_reader != nullptr, - "[StorageV2] row group reader is nullptr"); - row_group_reader->SetRowGroupOffsetAndCount(block.offset, - block.count); - LOG_INFO( - "[StorageV2] read row groups from file {} with offset " - "{} and count " - "{}", - file, - block.offset, - block.count); - auto ret = std::make_shared(); - for (int64_t i = 0; i < block.count; ++i) { - std::shared_ptr table; + AssertInfo( + result.ok(), + "[StorageV2] Failed to create row group reader: " + + result.status().ToString()); + auto row_group_reader = result.ValueOrDie(); auto status = - row_group_reader->ReadNextRowGroup(&table); + row_group_reader->SetRowGroupOffsetAndCount( + block.offset, block.count); AssertInfo(status.ok(), - "[StorageV2] Failed to read row group " + - std::to_string(block.offset + i) + - " from file " + file + " with error " + - status.ToString()); - ret->arrow_tables.push_back( - {file_idx, - static_cast(block.offset + i), - table}); - } - auto close_status = row_group_reader->Close(); - AssertInfo(close_status.ok(), - "[StorageV2] Failed to close row group reader " - "for file " + - file + " with error " + - close_status.ToString()); - return ret; - })); + "[StorageV2] Failed to set row group offset " + "and count " + + std::to_string(block.offset) + " and " + + std::to_string(block.count) + + " with error " + status.ToString()); + auto ret = std::make_shared(); + for (int64_t i = 0; i < block.count; ++i) { + std::shared_ptr table; + auto status = + row_group_reader->ReadNextRowGroup(&table); + AssertInfo(status.ok(), + "[StorageV2] Failed to read row group " + + std::to_string(block.offset + i) + + " from file " + file + + " with error " + status.ToString()); + ret->arrow_tables.push_back( + {file_idx, + static_cast(block.offset + i), + table}); + } + auto close_status = row_group_reader->Close(); + AssertInfo( + close_status.ok(), + "[StorageV2] Failed to close row group reader " + "for file " + + file + " with error " + + close_status.ToString()); + return ret; + })); } for (auto& future : futures) { diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index 5f557c0321..f6d135849f 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema, } auto writer_properties = builder.build(); - auto writer = std::make_unique( - trueFs, - truePaths, - trueSchema, - storage_config, - columnGroups, - buffer_size, - writer_properties); - AssertInfo(writer, "[StorageV2] writer pointer is null"); - *c_packed_writer = writer.release(); + auto result = + milvus_storage::PackedRecordBatchWriter::Make(trueFs, + truePaths, + trueSchema, + storage_config, + columnGroups, + buffer_size, + writer_properties); + AssertInfo(result.ok(), + "[StorageV2] Failed to create packed writer: " + + result.status().ToString()); + auto writer = result.ValueOrDie(); + *c_packed_writer = + new std::shared_ptr( + std::move(writer)); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); @@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema, } auto writer_properties = builder.build(); - auto writer = std::make_unique( - trueFs, - truePaths, - trueSchema, - conf, - columnGroups, - buffer_size, - writer_properties); - AssertInfo(writer, "[StorageV2] writer pointer is null"); - *c_packed_writer = writer.release(); + auto result = + milvus_storage::PackedRecordBatchWriter::Make(trueFs, + truePaths, + trueSchema, + conf, + columnGroups, + buffer_size, + writer_properties); + AssertInfo(result.ok(), + "[StorageV2] Failed to create packed writer: " + + result.status().ToString()); + auto writer = result.ValueOrDie(); + *c_packed_writer = + new std::shared_ptr( + std::move(writer)); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); @@ -201,9 +211,9 @@ WriteRecordBatch(CPackedWriter c_packed_writer, SCOPE_CGO_CALL_METRIC(); try { - auto packed_writer = - static_cast( - c_packed_writer); + auto packed_writer = *static_cast< + std::shared_ptr*>( + c_packed_writer); auto import_schema = arrow::ImportSchema(schema); if (!import_schema.ok()) { @@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) { SCOPE_CGO_CALL_METRIC(); try { - auto packed_writer = - static_cast( - c_packed_writer); - auto status = packed_writer->Close(); + auto packed_writer = static_cast< + std::shared_ptr*>( + c_packed_writer); + auto status = (*packed_writer)->Close(); delete packed_writer; if (!status.ok()) { return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index b254edffca..d2844a6479 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -109,11 +109,15 @@ GroupChunkTranslator::GroupChunkTranslator( parquet_file_metadata_.reserve(insert_files_.size()); row_group_meta_list_.reserve(insert_files_.size()); for (const auto& file : insert_files_) { - auto reader = std::make_shared( + auto result = milvus_storage::FileRowGroupReader::Make( fs, file, milvus_storage::DEFAULT_READ_BUFFER_SIZE, storage::GetReaderProperties()); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto reader = result.ValueOrDie(); parquet_file_metadata_.push_back( reader->file_metadata()->GetParquetMetadata()); diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp index 50d592864e..a40ba5dd36 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp @@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam { {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}}; auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); - milvus_storage::PackedRecordBatchWriter writer(fs_, - paths_, - arrow_schema_, - storage_config, - column_groups, - writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + paths_, + arrow_schema_, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); int64_t total_rows = 0; for (int64_t i = 0; i < n_batch; i++) { auto dataset = DataGen(schema_, per_batch); @@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam { ConvertToArrowRecordBatch(dataset, dim, arrow_schema_); total_rows += record_batch->num_rows(); - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); } protected: @@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { milvus::proto::common::LoadPriority::LOW); // num cells - get the expected number from the file directly - auto fr = - std::make_shared(fs_, paths_[0]); + auto reader_result = + milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]); + AssertInfo(reader_result.ok(), + "[StorageV2] Failed to create file row group reader: " + + reader_result.status().ToString()); + auto fr = reader_result.ValueOrDie(); auto expected_num_cells = fr->file_metadata()->GetRowGroupMetadataVector().size(); auto row_group_metadata_vector = @@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) { auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); std::vector single_file_paths{file_path}; - milvus_storage::PackedRecordBatchWriter writer(fs_, - single_file_paths, - arrow_schema_, - storage_config, - column_groups, - writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + single_file_paths, + arrow_schema_, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); for (int64_t i = 0; i < n_batch; i++) { auto dataset = DataGen(schema_, per_batch); auto record_batch = ConvertToArrowRecordBatch(dataset, dim, arrow_schema_); total_rows += record_batch->num_rows(); - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); // Get the number of row groups in this file - auto fr = std::make_shared( - fs_, file_path); + auto reader_result = + milvus_storage::FileRowGroupReader::Make(fs_, file_path); + AssertInfo(reader_result.ok(), + "[StorageV2] Failed to create file row group reader: " + + reader_result.status().ToString()); + auto fr = reader_result.ValueOrDie(); expected_row_groups_per_file.push_back( fr->file_metadata()->GetRowGroupMetadataVector().size()); auto status = fr->Close(); @@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) { auto usage = translator->estimated_byte_size_of_cell(i).first; // Get the expected memory size from the corresponding file - auto fr = std::make_shared( + auto reader_result = milvus_storage::FileRowGroupReader::Make( fs_, multi_file_paths[file_idx]); + AssertInfo(reader_result.ok(), + "[StorageV2] Failed to create file row group reader: " + + reader_result.status().ToString()); + auto fr = reader_result.ValueOrDie(); auto row_group_metadata_vector = fr->file_metadata()->GetRowGroupMetadataVector(); auto expected_size = static_cast( diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index ac71be37ba..cc7066d343 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, for (auto& column_group_file : remote_chunk_files) { // get all row groups for each file std::vector> row_group_lists; - auto reader = std::make_shared( + auto result = milvus_storage::FileRowGroupReader::Make( fs, column_group_file, milvus_storage::DEFAULT_READ_BUFFER_SIZE, GetReaderProperties()); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto reader = result.ValueOrDie(); auto row_group_num = reader->file_metadata()->GetRowGroupMetadataVector().size(); @@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id, field_id_list.Add(column_group_id.get()); return field_id_list; } - auto file_reader = std::make_shared( + auto result = milvus_storage::FileRowGroupReader::Make( fs, filepath, arrow_schema, milvus_storage::DEFAULT_READ_BUFFER_SIZE, GetReaderProperties()); + AssertInfo(result.ok(), + "[StorageV2] Failed to create file row group reader: " + + result.status().ToString()); + auto file_reader = result.ValueOrDie(); field_id_list = file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList( column_group_id.get()); diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp index 2490d137cd..b7a09f9918 100644 --- a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp @@ -22,14 +22,14 @@ #include "monitor/scope_metric.h" ReaderHandle -createFFIReader(char* manifest, +createFFIReader(ColumnGroupsHandle column_groups_handle, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, const std::shared_ptr& properties) { ReaderHandle reader_handler = 0; - FFIResult result = reader_new(manifest, + FFIResult result = reader_new(column_groups_handle, schema, needed_columns, needed_columns_size, @@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path, } CStatus -NewPackedFFIReaderWithManifest(const char* manifest_content, +NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, @@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content, try { auto properties = MakeInternalPropertiesFromStorageConfig(c_storage_config); - // Parse the column groups, the column groups is a JSON string - auto cpp_column_groups = - std::make_shared(); - auto des_result = - cpp_column_groups->deserialize(std::string_view(manifest_content)); - AssertInfo(des_result.ok(), "failed to deserialize column groups"); + auto* cg_ptr = reinterpret_cast< + std::shared_ptr*>( + column_groups_handle); + auto cpp_column_groups = *cg_ptr; auto reader = GetLoonReader(cpp_column_groups, schema, diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.h b/internal/core/src/storage/loon_ffi/ffi_reader_c.h index b2985e096a..91b01be826 100644 --- a/internal/core/src/storage/loon_ffi/ffi_reader_c.h +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.h @@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path, * be freed after this call returns. */ CStatus -NewPackedFFIReaderWithManifest(const char* manifest_content, +NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, diff --git a/internal/core/src/storage/loon_ffi/util.cpp b/internal/core/src/storage/loon_ffi/util.cpp index 351c2d7889..c570b48c8d 100644 --- a/internal/core/src/storage/loon_ffi/util.cpp +++ b/internal/core/src/storage/loon_ffi/util.cpp @@ -257,12 +257,10 @@ GetManifest(const std::string& path, // Parse the JSON string json j = json::parse(path); - // Extract base_path and ver fields + // Extract base_path std::string base_path = j.at("base_path").get(); - int64_t ver = j.at("ver").get(); - // return std::make_pair(base_path, ver); - char* out_column_groups = nullptr; + ColumnGroupsHandle out_column_groups = 0; int64_t out_read_version = 0; FFIResult result = get_latest_column_groups(base_path.c_str(), properties.get(), @@ -298,9 +296,8 @@ GetColumnGroups( // Parse the JSON string json j = json::parse(path); - // Extract base_path and ver fields + // Extract base_path std::string base_path = j.at("base_path").get(); - int64_t ver = j.at("ver").get(); // TODO fetch manifest based on version after api supported auto transaction = diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 800c1fd5d9..30cfd6fb8a 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION ba7df7b) +set( milvus-storage_VERSION 5fff4f5) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/core/unittest/test_storage_v2_index_raw_data.cpp b/internal/core/unittest/test_storage_v2_index_raw_data.cpp index 99958d0a3f..b928dcc30b 100644 --- a/internal/core/unittest/test_storage_v2_index_raw_data.cpp +++ b/internal/core/unittest/test_storage_v2_index_raw_data.cpp @@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) { auto writer_memory = 16 * 1024 * 1024; auto storage_config = milvus_storage::StorageConfig(); auto arrow_schema = schema->ConvertToArrowSchema(); - milvus_storage::PackedRecordBatchWriter writer( - fs_, paths, arrow_schema, storage_config, column_groups, writer_memory); + auto result = milvus_storage::PackedRecordBatchWriter::Make( + fs_, + paths, + arrow_schema, + storage_config, + column_groups, + writer_memory, + ::parquet::default_writer_properties()); + EXPECT_TRUE(result.ok()); + auto writer = result.ValueOrDie(); int64_t total_rows = 0; for (int64_t i = 0; i < n_batch; i++) { auto dataset = DataGen(schema, per_batch); @@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) { ConvertToArrowRecordBatch(dataset, dim, arrow_schema); total_rows += record_batch->num_rows(); - EXPECT_TRUE(writer.Write(record_batch).ok()); + EXPECT_TRUE(writer->Write(record_batch).ok()); } - EXPECT_TRUE(writer.Close().ok()); + EXPECT_TRUE(writer->Close().ok()); { // test memory file manager diff --git a/internal/storage/record_reader.go b/internal/storage/record_reader.go index a242773fe8..7e0267743c 100644 --- a/internal/storage/record_reader.go +++ b/internal/storage/record_reader.go @@ -244,13 +244,7 @@ func NewManifestReader(manifest string, } func (mr *ManifestReader) init() error { - // TODO add needed column option - manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig) - if err != nil { - return err - } - - reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext) + reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext) if err != nil { return err } diff --git a/internal/storagev2/packed/packed_reader_ffi.go b/internal/storagev2/packed/packed_reader_ffi.go index 87863cc3ea..866884ccc4 100644 --- a/internal/storagev2/packed/packed_reader_ffi.go +++ b/internal/storagev2/packed/packed_reader_ffi.go @@ -31,6 +31,7 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/cdata" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" @@ -38,9 +39,11 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" ) -func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) { - cManifest := C.CString(manifest) - defer C.free(unsafe.Pointer(cManifest)) +func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) { + cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to get manifest") + } var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) @@ -103,7 +106,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0])) cNumColumns := C.int64_t(len(neededColumns)) - status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr) + status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr) } else { return nil, fmt.Errorf("storageConfig is required") } @@ -184,30 +187,29 @@ func (r *FFIPackedReader) Release() { r.Close() } -func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) { +func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) { + var cColumnGroups C.ColumnGroupsHandle basePath, version, err := UnmarshalManfestPath(manifestPath) if err != nil { - return "", err + return cColumnGroups, err } log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version)) cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil) if err != nil { - return "", err + return cColumnGroups, err } cBasePath := C.CString(basePath) defer C.free(unsafe.Pointer(cBasePath)) - var cManifest *C.char var cVersion C.int64_t - result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion) + result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion) err = HandleFFIResult(result) if err != nil { - return "", err + return cColumnGroups, err } - manifest = C.GoString(cManifest) - return manifest, nil + return cColumnGroups, nil } // Ensure FFIPackedReader implements array.RecordReader interface diff --git a/internal/storagev2/packed/packed_writer_ffi.go b/internal/storagev2/packed/packed_writer_ffi.go index 2da5c56a77..835cb88a6d 100644 --- a/internal/storagev2/packed/packed_writer_ffi.go +++ b/internal/storagev2/packed/packed_writer_ffi.go @@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { } func (pw *FFIPackedWriter) Close() (string, error) { - var manifest *C.char + var cColumnGroups C.ColumnGroupsHandle - result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest) + result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups) if err := HandleFFIResult(result); err != nil { return "", err } @@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) { cBasePath := C.CString(pw.basePath) defer C.free(unsafe.Pointer(cBasePath)) var transationHandle C.TransactionHandle - result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle) + + // TODO pass version + // use -1 as latest + result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1)) if err := HandleFFIResult(result); err != nil { return "", err } @@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) { // #define LOON_TRANSACTION_RESOLVE_MERGE 1 // #define LOON_TRANSACTION_RESOLVE_MAX 2 - var commitResult C.bool - result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult) + var commitResult C.TransactionCommitResult + result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult) if err := HandleFFIResult(result); err != nil { return "", err } - var readVersion C.int64_t - - // TODO: not atomic, need to get version from transaction - var cOutManifest *C.char - result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion) - if err := HandleFFIResult(result); err != nil { - return "", err - } - outManifest := C.GoString(cOutManifest) - log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion))) + log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version))) defer C.properties_free(pw.cProperties) - return MarshalManifestPath(pw.basePath, int64(readVersion)), nil + return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil } diff --git a/scripts/core_build.sh b/scripts/core_build.sh index f7b38793e3..ecb5317ad8 100755 --- a/scripts/core_build.sh +++ b/scripts/core_build.sh @@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1 CPU_ARCH=$(get_cpu_arch $CPU_TARGET) +# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5 +export CMAKE_POLICY_VERSION_MINIMUM=3.5 + arch=$(uname -m) CMAKE_CMD="cmake \ ${CMAKE_EXTRA_ARGS} \