From d2e4278b18adb8cc775afd6e553a983774399f73 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 28 Nov 2025 18:55:07 +0800 Subject: [PATCH] enhance: use milvus-storage internal C++ Reader API for Loon FFI (#45897) This PR refactors the Loon FFI reader implementation to use milvus-storage's internal C++ Reader API directly instead of the external FFI interface. Key changes: - Replace external FFI calls (get_record_batch_reader, reader_destroy) with direct C++ Reader API calls - Add GetLoonReader() helper function to create Reader instances using milvus-storage::api::Reader::create() - Use MakeInternalPropertiesFromStorageConfig() instead of MakePropertiesFromStorageConfig() to get internal properties - Update NewPackedFFIReaderWithManifest() to deserialize column groups from JSON manifest content directly - Simplify GetFFIReaderStream() to use Reader::get_record_batch_reader() and arrow::ExportRecordBatchReader() for Arrow stream export - Change CFFIPackedReader typedef from ReaderHandle to void* for flexibility - Update milvus-storage dependency version to ba7df7b This change improves code maintainability by using the native C++ API directly and eliminates the overhead of going through the external FFI layer. issue: #44956 Signed-off-by: Congqi Xia --- .../src/storage/loon_ffi/ffi_reader_c.cpp | 98 ++++++++++++------- .../core/src/storage/loon_ffi/ffi_reader_c.h | 4 +- .../storagev2/packed/packed_reader_ffi.go | 9 +- 3 files changed, 67 insertions(+), 44 deletions(-) diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp index 2a5ca2e54c..2490d137cd 100644 --- a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp @@ -17,6 +17,7 @@ #include "storage/loon_ffi/ffi_reader_c.h" #include "common/common_type_c.h" #include "milvus-storage/ffi_c.h" +#include "milvus-storage/reader.h" #include "storage/loon_ffi/util.h" #include "monitor/scope_metric.h" @@ -46,6 +47,24 @@ createFFIReader(char* manifest, return reader_handler; } +std::unique_ptr +GetLoonReader( + std::shared_ptr column_groups, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + const std::shared_ptr& properties) { + auto result = arrow::ImportSchema(schema); + AssertInfo(result.ok(), "Import arrow schema failed"); + auto arrow_schema = result.ValueOrDie(); + return milvus_storage::api::Reader::create( + column_groups, + arrow_schema, + std::make_shared>( + needed_columns, needed_columns + needed_columns_size), + *properties); +} + CStatus NewPackedFFIReader(const char* manifest_path, struct ArrowSchema* schema, @@ -57,16 +76,20 @@ NewPackedFFIReader(const char* manifest_path, SCOPE_CGO_CALL_METRIC(); try { - auto properties = MakePropertiesFromStorageConfig(c_storage_config); - + auto properties = + MakeInternalPropertiesFromStorageConfig(c_storage_config); AssertInfo(properties != nullptr, "properties is nullptr"); - // Use manifest_path if provided - char* manifest = - manifest_path ? const_cast(manifest_path) : nullptr; - ReaderHandle reader_handle = createFFIReader( - manifest, schema, needed_columns, needed_columns_size, properties); - *c_packed_reader = reader_handle; + auto column_groups = GetColumnGroups(manifest_path, properties); + AssertInfo(column_groups != nullptr, "column groups is nullptr"); + + auto reader = GetLoonReader(column_groups, + schema, + needed_columns, + needed_columns_size, + properties); + + *c_packed_reader = static_cast(reader.release()); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); @@ -78,22 +101,28 @@ NewPackedFFIReaderWithManifest(const char* manifest_content, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, - CFFIPackedReader* c_packed_reader, + CFFIPackedReader* c_loon_reader, CStorageConfig c_storage_config, CPluginContext* c_plugin_context) { SCOPE_CGO_CALL_METRIC(); try { - auto properties = MakePropertiesFromStorageConfig(c_storage_config); + auto properties = + MakeInternalPropertiesFromStorageConfig(c_storage_config); + // Parse the column groups, the column groups is a JSON string + auto cpp_column_groups = + std::make_shared(); + auto des_result = + cpp_column_groups->deserialize(std::string_view(manifest_content)); + AssertInfo(des_result.ok(), "failed to deserialize column groups"); - AssertInfo(properties != nullptr, "properties is nullptr"); + auto reader = GetLoonReader(cpp_column_groups, + schema, + needed_columns, + needed_columns_size, + properties); - // Use manifest_content as the manifest parameter - char* manifest = - manifest_content ? const_cast(manifest_content) : nullptr; - ReaderHandle reader_handle = createFFIReader( - manifest, schema, needed_columns, needed_columns_size, properties); - *c_packed_reader = reader_handle; + *c_loon_reader = static_cast(reader.release()); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); @@ -107,23 +136,23 @@ GetFFIReaderStream(CFFIPackedReader c_packed_reader, SCOPE_CGO_CALL_METRIC(); try { - auto reader_handle = static_cast(c_packed_reader); + auto reader = + static_cast(c_packed_reader); - // Default parameters for get_record_batch_reader - const char* predicate = nullptr; // No filtering - FFIResult result = - get_record_batch_reader(reader_handle, predicate, out_stream); + // FFIResult result = + // get_record_batch_reader(reader_handle, predicate, out_stream); + auto result = reader->get_record_batch_reader(); + AssertInfo(result.ok(), + "failed to get record batch reader, {}", + result.status().ToString()); - if (!IsSuccess(&result)) { - auto message = GetErrorMessage(&result); - std::string error_msg = - message ? message : "Failed to get record batch reader"; - FreeFFIResult(&result); - return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, - error_msg); - } + auto array_stream = result.ValueOrDie(); + arrow::Status status = + arrow::ExportRecordBatchReader(array_stream, out_stream); + AssertInfo(status.ok(), + "failed to export record batch reader, {}", + status.ToString()); - FreeFFIResult(&result); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); @@ -135,10 +164,9 @@ CloseFFIReader(CFFIPackedReader c_packed_reader) { SCOPE_CGO_CALL_METRIC(); try { - auto reader_handle = static_cast(c_packed_reader); - if (reader_handle != 0) { - reader_destroy(reader_handle); - } + auto reader = + static_cast(c_packed_reader); + delete reader; return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.h b/internal/core/src/storage/loon_ffi/ffi_reader_c.h index a055ce64c0..b2985e096a 100644 --- a/internal/core/src/storage/loon_ffi/ffi_reader_c.h +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.h @@ -30,7 +30,7 @@ extern "C" { * from storage in Milvus. The reader supports Arrow-based data access * through the FFI (Foreign Function Interface) layer. */ -typedef ReaderHandle CFFIPackedReader; +typedef void* CFFIPackedReader; /** * @brief Creates a new packed FFI reader from a manifest file path. @@ -109,7 +109,7 @@ NewPackedFFIReaderWithManifest(const char* manifest_content, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, - CFFIPackedReader* c_packed_reader, + CFFIPackedReader* c_loon_reader, CStorageConfig c_storage_config, CPluginContext* c_plugin_context); diff --git a/internal/storagev2/packed/packed_reader_ffi.go b/internal/storagev2/packed/packed_reader_ffi.go index f3b5c3edf1..87863cc3ea 100644 --- a/internal/storagev2/packed/packed_reader_ffi.go +++ b/internal/storagev2/packed/packed_reader_ffi.go @@ -163,13 +163,8 @@ func (r *FFIPackedReader) Close() error { r.recordReader = nil } - if r.cPackedReader != 0 { - status := C.CloseFFIReader(r.cPackedReader) - r.cPackedReader = 0 - return ConsumeCStatusIntoError(&status) - } - - return nil + status := C.CloseFFIReader(r.cPackedReader) + return ConsumeCStatusIntoError(&status) } // Schema returns the schema of the reader