From 55bfd610b6fe2f6da95a49debc20536ce68f3560 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 5 Nov 2025 19:57:34 +0800 Subject: [PATCH] enhance: [StorageV2] Integrate FFI interface for packed reader (#45132) Related to #44956 Integrate the StorageV2 FFI interface as the unified storage layer for reading packed columnar data, replacing the custom iterative reader with a manifest-based approach using the milvus-storage library. Changes: - Add C++ FFI reader implementation (ffi_reader_c.cpp/h) with Arrow C Stream interface - Implement utility functions to convert CStorageConfig to milvus-storage Properties - Create ManifestReader in Go that generates manifests from binlogs - Add FFI packed reader CGO bindings (packed_reader_ffi.go) - Refactor NewBinlogRecordReader to use ManifestReader for V2 storage - Support both manifest file paths and direct manifest content - Enable configurable buffer sizes and column projection Technical improvements: - Zero-copy data exchange using Arrow C Data Interface - Optimized I/O operations through milvus-storage library - Simplified code path with manifest-based reading - Better performance with batched streaming reads --------- Signed-off-by: Congqi Xia --- internal/core/src/storage/CMakeLists.txt | 4 + .../core/src/storage/loon_ffi/CMakeLists.txt | 22 ++ .../src/storage/loon_ffi/ffi_reader_c.cpp | 146 ++++++++++++++ .../core/src/storage/loon_ffi/ffi_reader_c.h | 151 ++++++++++++++ internal/core/src/storage/loon_ffi/util.cpp | 115 +++++++++++ internal/core/src/storage/loon_ffi/util.h | 38 ++++ .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- .../datanode/compactor/mix_compactor_test.go | 7 +- internal/storage/rw.go | 21 +- internal/storage/rw_test.go | 6 + internal/storage/serde_events.go | 165 ++++++++++++++- .../storagev2/packed/packed_reader_ffi.go | 190 ++++++++++++++++++ internal/storagev2/packed/type.go | 8 + 13 files changed, 852 insertions(+), 23 deletions(-) create mode 100644 internal/core/src/storage/loon_ffi/CMakeLists.txt create mode 100644 internal/core/src/storage/loon_ffi/ffi_reader_c.cpp create mode 100644 internal/core/src/storage/loon_ffi/ffi_reader_c.h create mode 100644 internal/core/src/storage/loon_ffi/util.cpp create mode 100644 internal/core/src/storage/loon_ffi/util.h create mode 100644 internal/storagev2/packed/packed_reader_ffi.go diff --git a/internal/core/src/storage/CMakeLists.txt b/internal/core/src/storage/CMakeLists.txt index 614a837658..de44b81199 100644 --- a/internal/core/src/storage/CMakeLists.txt +++ b/internal/core/src/storage/CMakeLists.txt @@ -32,6 +32,10 @@ if(USE_OPENDAL) set(SOURCE_FILES ${SOURCE_FILES} opendal/OpenDALChunkManager.cpp) endif() +# Add loon_ffi subdirectory to include loon FFI source files +add_subdirectory(loon_ffi) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/loon_ffi) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/plugin) add_library(milvus_storage OBJECT ${SOURCE_FILES}) diff --git a/internal/core/src/storage/loon_ffi/CMakeLists.txt b/internal/core/src/storage/loon_ffi/CMakeLists.txt new file mode 100644 index 0000000000..3370e33a64 --- /dev/null +++ b/internal/core/src/storage/loon_ffi/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License + +# FFI Reader source files for interfacing with milvus-storage through FFI +set(FFI_SRCS + ${CMAKE_CURRENT_SOURCE_DIR}/ffi_reader_c.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/util.cpp +) + +# Add FFI Reader source files to parent's SOURCE_FILES +set(SOURCE_FILES ${SOURCE_FILES} ${FFI_SRCS} PARENT_SCOPE) + +# Include directories for FFI Reader +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp new file mode 100644 index 0000000000..2a5ca2e54c --- /dev/null +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp @@ -0,0 +1,146 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "common/EasyAssert.h" +#include "storage/loon_ffi/ffi_reader_c.h" +#include "common/common_type_c.h" +#include "milvus-storage/ffi_c.h" +#include "storage/loon_ffi/util.h" +#include "monitor/scope_metric.h" + +ReaderHandle +createFFIReader(char* manifest, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + const std::shared_ptr& properties) { + ReaderHandle reader_handler = 0; + + FFIResult result = reader_new(manifest, + schema, + needed_columns, + needed_columns_size, + properties.get(), + &reader_handler); + if (!IsSuccess(&result)) { + auto message = GetErrorMessage(&result); + // Copy the error message before freeing the FFIResult + std::string error_msg = message ? message : "Unknown error"; + FreeFFIResult(&result); + throw std::runtime_error(error_msg); + } + + FreeFFIResult(&result); + return reader_handler; +} + +CStatus +NewPackedFFIReader(const char* manifest_path, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + CFFIPackedReader* c_packed_reader, + CStorageConfig c_storage_config, + CPluginContext* c_plugin_context) { + SCOPE_CGO_CALL_METRIC(); + + try { + auto properties = MakePropertiesFromStorageConfig(c_storage_config); + + AssertInfo(properties != nullptr, "properties is nullptr"); + + // Use manifest_path if provided + char* manifest = + manifest_path ? const_cast(manifest_path) : nullptr; + ReaderHandle reader_handle = createFFIReader( + manifest, schema, needed_columns, needed_columns_size, properties); + *c_packed_reader = reader_handle; + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +CStatus +NewPackedFFIReaderWithManifest(const char* manifest_content, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + CFFIPackedReader* c_packed_reader, + CStorageConfig c_storage_config, + CPluginContext* c_plugin_context) { + SCOPE_CGO_CALL_METRIC(); + + try { + auto properties = MakePropertiesFromStorageConfig(c_storage_config); + + AssertInfo(properties != nullptr, "properties is nullptr"); + + // Use manifest_content as the manifest parameter + char* manifest = + manifest_content ? const_cast(manifest_content) : nullptr; + ReaderHandle reader_handle = createFFIReader( + manifest, schema, needed_columns, needed_columns_size, properties); + *c_packed_reader = reader_handle; + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +CStatus +GetFFIReaderStream(CFFIPackedReader c_packed_reader, + int64_t buffer_size, + struct ArrowArrayStream* out_stream) { + SCOPE_CGO_CALL_METRIC(); + + try { + auto reader_handle = static_cast(c_packed_reader); + + // Default parameters for get_record_batch_reader + const char* predicate = nullptr; // No filtering + FFIResult result = + get_record_batch_reader(reader_handle, predicate, out_stream); + + if (!IsSuccess(&result)) { + auto message = GetErrorMessage(&result); + std::string error_msg = + message ? message : "Failed to get record batch reader"; + FreeFFIResult(&result); + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, + error_msg); + } + + FreeFFIResult(&result); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + +CStatus +CloseFFIReader(CFFIPackedReader c_packed_reader) { + SCOPE_CGO_CALL_METRIC(); + + try { + auto reader_handle = static_cast(c_packed_reader); + if (reader_handle != 0) { + reader_destroy(reader_handle); + } + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.h b/internal/core/src/storage/loon_ffi/ffi_reader_c.h new file mode 100644 index 0000000000..a055ce64c0 --- /dev/null +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.h @@ -0,0 +1,151 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common/common_type_c.h" +#include "common/type_c.h" +#include +#include "milvus-storage/ffi_c.h" + +/** + * @brief Handle to a packed FFI reader instance. + * + * This is an alias for ReaderHandle used to read packed columnar data + * from storage in Milvus. The reader supports Arrow-based data access + * through the FFI (Foreign Function Interface) layer. + */ +typedef ReaderHandle CFFIPackedReader; + +/** + * @brief Creates a new packed FFI reader from a manifest file path. + * + * This function initializes a packed reader that can read columnar data + * from storage based on the manifest file. The manifest contains metadata + * about the data layout and file locations. + * + * @param manifest_path Path to the manifest file in object storage. + * Must be a valid UTF-8 encoded null-terminated string. + * @param schema Arrow schema defining the structure of the data. + * Must be a valid ArrowSchema pointer conforming to + * the Arrow C data interface specification. + * @param needed_columns Array of column names to read. If NULL, all columns + * from the schema will be read. + * @param needed_columns_size Number of column names in the needed_columns array. + * Must be 0 if needed_columns is NULL. + * @param c_packed_reader Output parameter for the created reader handle. + * On success, will contain a valid reader handle that + * must be released by the caller when no longer needed. + * @param c_storage_config Storage configuration containing credentials and + * endpoint information for accessing object storage. + * @param c_plugin_context Plugin context for extensibility, may be NULL if + * no plugins are used. + * + * @return CStatus indicating success or failure. On failure, the error_msg + * field contains details about what went wrong. + * + * @note The caller is responsible for releasing the reader handle after use. + * @note The schema pointer must remain valid for the lifetime of the reader. + */ +CStatus +NewPackedFFIReader(const char* manifest_path, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + CFFIPackedReader* c_packed_reader, + CStorageConfig c_storage_config, + CPluginContext* c_plugin_context); + +/** + * @brief Creates a new packed FFI reader from manifest content directly. + * + * Similar to NewPackedFFIReader, but accepts the manifest content directly + * as a string instead of reading from a file path. This is useful when the + * manifest has already been loaded or is generated dynamically. + * + * @param manifest_content The manifest content as a null-terminated string. + * Must be valid JSON or protobuf text format containing + * the manifest data. + * @param schema Arrow schema defining the structure of the data. + * Must be a valid ArrowSchema pointer conforming to + * the Arrow C data interface specification. + * @param needed_columns Array of column names to read. If NULL, all columns + * from the schema will be read. + * @param needed_columns_size Number of column names in the needed_columns array. + * Must be 0 if needed_columns is NULL. + * @param c_packed_reader Output parameter for the created reader handle. + * On success, will contain a valid reader handle that + * must be released by the caller when no longer needed. + * @param c_storage_config Storage configuration containing credentials and + * endpoint information for accessing object storage. + * @param c_plugin_context Plugin context for extensibility, may be NULL if + * no plugins are used. + * + * @return CStatus indicating success or failure. On failure, the error_msg + * field contains details about what went wrong. + * + * @note The caller is responsible for releasing the reader handle after use. + * @note The schema pointer must remain valid for the lifetime of the reader. + * @note The manifest content is copied internally, so the input string can + * be freed after this call returns. + */ +CStatus +NewPackedFFIReaderWithManifest(const char* manifest_content, + struct ArrowSchema* schema, + char** needed_columns, + int64_t needed_columns_size, + CFFIPackedReader* c_packed_reader, + CStorageConfig c_storage_config, + CPluginContext* c_plugin_context); + +/** + * @brief Gets an ArrowArrayStream from the FFI reader for streaming data access. + * + * This function returns an ArrowArrayStream that can be used to iterate through + * record batches. The stream follows the Arrow C Stream Interface specification + * and must be released by calling stream->release() when done. + * + * @param c_packed_reader The FFI reader handle. + * @param out_stream Output parameter for the ArrowArrayStream. The caller + * is responsible for calling stream->release() when done. + * + * @return CStatus indicating success or failure. On failure, the error_msg + * field contains details about what went wrong. + * + * @note The stream must be released by calling out_stream->release(out_stream) + * when no longer needed to prevent memory leaks. + * @note Each call to this function creates a new stream starting from the beginning. + */ +CStatus +GetFFIReaderStream(CFFIPackedReader c_packed_reader, + int64_t batch_size, + struct ArrowArrayStream* out_stream); + +/** + * @brief Closes and releases the FFI reader. + * + * @param c_packed_reader The FFI reader handle to close. + * + * @return CStatus indicating success or failure. + */ +CStatus +CloseFFIReader(CFFIPackedReader c_packed_reader); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/internal/core/src/storage/loon_ffi/util.cpp b/internal/core/src/storage/loon_ffi/util.cpp new file mode 100644 index 0000000000..b835f19d19 --- /dev/null +++ b/internal/core/src/storage/loon_ffi/util.cpp @@ -0,0 +1,115 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include "common/common_type_c.h" +#include "common/type_c.h" +#include "milvus-storage/properties.h" +#include "storage/loon_ffi/util.h" + +std::shared_ptr +MakePropertiesFromStorageConfig(CStorageConfig c_storage_config) { + // Prepare key-value pairs from CStorageConfig + std::vector keys; + std::vector values; + + // Add non-null string fields + if (c_storage_config.address != nullptr) { + keys.emplace_back(PROPERTY_FS_ADDRESS); + values.emplace_back(c_storage_config.address); + } + if (c_storage_config.bucket_name != nullptr) { + keys.emplace_back(PROPERTY_FS_BUCKET_NAME); + values.emplace_back(c_storage_config.bucket_name); + } + if (c_storage_config.access_key_id != nullptr) { + keys.emplace_back(PROPERTY_FS_ACCESS_KEY_ID); + values.emplace_back(c_storage_config.access_key_id); + } + if (c_storage_config.access_key_value != nullptr) { + keys.emplace_back(PROPERTY_FS_ACCESS_KEY_VALUE); + values.emplace_back(c_storage_config.access_key_value); + } + if (c_storage_config.root_path != nullptr) { + keys.emplace_back(PROPERTY_FS_ROOT_PATH); + values.emplace_back(c_storage_config.root_path); + } + if (c_storage_config.storage_type != nullptr) { + keys.emplace_back(PROPERTY_FS_STORAGE_TYPE); + values.emplace_back(c_storage_config.storage_type); + } + if (c_storage_config.cloud_provider != nullptr) { + keys.emplace_back(PROPERTY_FS_CLOUD_PROVIDER); + values.emplace_back(c_storage_config.cloud_provider); + } + if (c_storage_config.iam_endpoint != nullptr) { + keys.emplace_back(PROPERTY_FS_IAM_ENDPOINT); + values.emplace_back(c_storage_config.iam_endpoint); + } + if (c_storage_config.log_level != nullptr) { + keys.emplace_back(PROPERTY_FS_LOG_LEVEL); + values.emplace_back("Warn"); + } + if (c_storage_config.region != nullptr) { + keys.emplace_back(PROPERTY_FS_REGION); + values.emplace_back(c_storage_config.region); + } + if (c_storage_config.sslCACert != nullptr) { + keys.emplace_back(PROPERTY_FS_SSL_CA_CERT); + values.emplace_back(c_storage_config.sslCACert); + } + if (c_storage_config.gcp_credential_json != nullptr) { + keys.emplace_back(PROPERTY_FS_GCP_CREDENTIAL_JSON); + values.emplace_back(c_storage_config.gcp_credential_json); + } + + // Add boolean fields + keys.emplace_back(PROPERTY_FS_USE_SSL); + values.emplace_back(c_storage_config.useSSL ? "true" : "false"); + + keys.emplace_back(PROPERTY_FS_USE_IAM); + values.emplace_back(c_storage_config.useIAM ? "true" : "false"); + + keys.emplace_back(PROPERTY_FS_USE_VIRTUAL_HOST); + values.emplace_back(c_storage_config.useVirtualHost ? "true" : "false"); + + keys.emplace_back(PROPERTY_FS_USE_CUSTOM_PART_UPLOAD); + values.emplace_back(c_storage_config.use_custom_part_upload ? "true" + : "false"); + + // Add integer field + std::string timeout_str = std::to_string(c_storage_config.requestTimeoutMs); + keys.emplace_back(PROPERTY_FS_REQUEST_TIMEOUT_MS); + values.emplace_back(timeout_str.c_str()); + + // Create Properties using FFI + auto properties = std::make_shared(); + FFIResult result = properties_create( + keys.data(), values.data(), keys.size(), properties.get()); + + if (!IsSuccess(&result)) { + auto message = GetErrorMessage(&result); + // Copy the error message before freeing the FFIResult + std::string error_msg = message ? message : "Unknown error"; + FreeFFIResult(&result); + throw std::runtime_error(error_msg); + } + + FreeFFIResult(&result); + return properties; +} \ No newline at end of file diff --git a/internal/core/src/storage/loon_ffi/util.h b/internal/core/src/storage/loon_ffi/util.h new file mode 100644 index 0000000000..9ae218df51 --- /dev/null +++ b/internal/core/src/storage/loon_ffi/util.h @@ -0,0 +1,38 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include "common/common_type_c.h" +#include "common/type_c.h" +#include "milvus-storage/ffi_c.h" + +/** + * @brief Creates a shared pointer to Properties from CStorageConfig + * + * This utility function converts a CStorageConfig structure into a Properties + * object by calling the FFI properties_create function. All configuration fields + * from CStorageConfig are mapped to corresponding key-value pairs in Properties. + * + * The following fields are converted: + * - String fields: address, bucket_name, access_key_id, access_key_value, + * root_path, storage_type, cloud_provider, iam_endpoint, log_level, + * region, ssl_ca_cert, gcp_credential_json + * - Boolean fields: use_ssl, use_iam, use_virtual_host, use_custom_part_upload + * - Integer fields: request_timeout_ms + * + * @param c_storage_config The storage configuration to convert + * @return std::shared_ptr Shared pointer to the created Properties + * @throws std::runtime_error If properties_create fails with error message from FFI + */ +std::shared_ptr +MakePropertiesFromStorageConfig(CStorageConfig c_storage_config); \ No newline at end of file diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 9aabc3ddfa..88ad941d0d 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION e5f5b4c ) +set( milvus-storage_VERSION cbcc922 ) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/datanode/compactor/mix_compactor_test.go b/internal/datanode/compactor/mix_compactor_test.go index 0339c6eb95..bbd699b687 100644 --- a/internal/datanode/compactor/mix_compactor_test.go +++ b/internal/datanode/compactor/mix_compactor_test.go @@ -1110,10 +1110,9 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { }, }, { - FieldID: Int64FieldWithDefaultValue, - Name: "field_int64_with_default_value", - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, + FieldID: Int64FieldWithDefaultValue, + Name: "field_int64_with_default_value", + DataType: schemapb.DataType_Int64, DefaultValue: &schemapb.ValueField{ Data: &schemapb.ValueField_LongData{ LongData: 10, diff --git a/internal/storage/rw.go b/internal/storage/rw.go index 4fb698fad7..8f8a34f714 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -20,7 +20,6 @@ import ( "context" "fmt" sio "io" - "path" "sort" "github.com/samber/lo" @@ -274,22 +273,12 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s sort.Slice(binlogs, func(i, j int) bool { return binlogs[i].GetFieldID() < binlogs[j].GetFieldID() }) - binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { - return fieldBinlog.GetBinlogs() - }) - bucketName := rwOptions.storageConfig.BucketName - paths := make([][]string, len(binlogLists[0])) - for _, binlogs := range binlogLists { - for j, binlog := range binlogs { - logPath := binlog.GetLogPath() - if rwOptions.storageConfig.StorageType != "local" { - logPath = path.Join(bucketName, logPath) - } - paths[j] = append(paths[j], logPath) - } + + var err error + rr, err = NewRecordReaderFromBinlogs(binlogs, schema, rwOptions.bufferSize, rwOptions.storageConfig, pluginContext) + if err != nil { + return nil, err } - // FIXME: add needed fields support - rr = newIterativePackedRecordReader(paths, schema, rwOptions.bufferSize, rwOptions.storageConfig, pluginContext) default: return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) } diff --git a/internal/storage/rw_test.go b/internal/storage/rw_test.go index 8c0d3ec07f..01ef8a674a 100644 --- a/internal/storage/rw_test.go +++ b/internal/storage/rw_test.go @@ -96,26 +96,32 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { { GroupID: 0, Columns: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + Fields: []int64{0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 101}, }, { GroupID: 102, Columns: []int{13}, + Fields: []int64{102}, }, { GroupID: 103, Columns: []int{14}, + Fields: []int64{103}, }, { GroupID: 104, Columns: []int{15}, + Fields: []int64{104}, }, { GroupID: 105, Columns: []int{16}, + Fields: []int64{105}, }, { GroupID: 106, Columns: []int{17}, + Fields: []int64{106}, }, } wOption := []RwOption{ diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 2dc2adb931..358f4f97d8 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "math" + "path" "sort" "strconv" @@ -34,14 +35,49 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/hook" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +func NewRecordReaderFromBinlogs(fieldBinlogs []*datapb.FieldBinlog, + schema *schemapb.CollectionSchema, + bufferSize int64, + storageConfig *indexpb.StorageConfig, + storagePluginContext *indexcgopb.StoragePluginContext, +) (RecordReader, error) { + // check legacy or import binlog struct + for _, fieldBinlog := range fieldBinlogs { + if len(fieldBinlog.ChildFields) == 0 { + binlogLists := lo.Map(fieldBinlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { + return fieldBinlog.GetBinlogs() + }) + bucketName := storageConfig.BucketName + paths := make([][]string, len(binlogLists[0])) + for _, binlogs := range binlogLists { + for j, binlog := range binlogs { + logPath := binlog.GetLogPath() + if storageConfig.StorageType != "local" { + logPath = path.Join(bucketName, logPath) + } + paths[j] = append(paths[j], logPath) + } + } + return newIterativePackedRecordReader(paths, schema, bufferSize, storageConfig, storagePluginContext), nil + } + } + return NewManifestReaderFromBinlogs(fieldBinlogs, schema, bufferSize, storageConfig, storagePluginContext) +} + +var _ RecordReader = (*IterativeRecordReader)(nil) + type IterativeRecordReader struct { cur RecordReader iterate func() (RecordReader, error) @@ -55,8 +91,6 @@ func (ir *IterativeRecordReader) Close() error { return nil } -var _ RecordReader = (*IterativeRecordReader)(nil) - func (ir *IterativeRecordReader) Next() (Record, error) { if ir.cur == nil { r, err := ir.iterate() @@ -80,6 +114,133 @@ func (ir *IterativeRecordReader) Next() (Record, error) { return rec, err } +type Manifest struct { + Version int `json:"version"` + ColumnGroups []*ColumnGroup `json:"column_groups"` +} + +type ColumnGroup struct { + Columns []string `json:"columns"` + Format string `json:"format"` + Paths []string `json:"paths"` +} + +type ManifestReader struct { + fieldBinlogs []*datapb.FieldBinlog + reader *packed.FFIPackedReader + + bufferSize int64 + arrowSchema *arrow.Schema + schema *schemapb.CollectionSchema + schemaHelper *typeutil.SchemaHelper + field2Col map[FieldID]int + storageConfig *indexpb.StorageConfig + storagePluginContext *indexcgopb.StoragePluginContext + + neededColumns []string +} + +// NewManifestReaderFromBinlogs creates a ManifestReader from binlogs +func NewManifestReaderFromBinlogs(fieldBinlogs []*datapb.FieldBinlog, + schema *schemapb.CollectionSchema, + bufferSize int64, + storageConfig *indexpb.StorageConfig, + storagePluginContext *indexcgopb.StoragePluginContext, +) (*ManifestReader, error) { + arrowSchema, err := ConvertToArrowSchema(schema) + if err != nil { + return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) + } + schemaHelper, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + return nil, err + } + field2Col := make(map[FieldID]int) + allFields := typeutil.GetAllFieldSchemas(schema) + neededColumns := make([]string, 0, len(allFields)) + for i, field := range allFields { + field2Col[field.FieldID] = i + neededColumns = append(neededColumns, field.Name) + } + prr := &ManifestReader{ + fieldBinlogs: fieldBinlogs, + bufferSize: bufferSize, + arrowSchema: arrowSchema, + schema: schema, + schemaHelper: schemaHelper, + field2Col: field2Col, + storageConfig: storageConfig, + storagePluginContext: storagePluginContext, + + neededColumns: neededColumns, + } + + err = prr.init() + if err != nil { + return nil, err + } + + return prr, nil +} + +func (mr *ManifestReader) generateManifest() (string, error) { + m := &Manifest{ + Version: 0, + ColumnGroups: lo.Map(mr.fieldBinlogs, func(binlog *datapb.FieldBinlog, _ int) *ColumnGroup { + return &ColumnGroup{ + Columns: lo.Map(binlog.ChildFields, func(fieldID int64, _ int) string { + field, err := mr.schemaHelper.GetFieldFromID(fieldID) + if err != nil { + // return empty string if field not found + return "" + } + return field.GetName() + }), + Format: "parquet", + Paths: lo.Map(binlog.Binlogs, func(binlog *datapb.Binlog, _ int) string { + p := binlog.GetLogPath() + if mr.storageConfig.StorageType != "local" { + p = path.Join(mr.storageConfig.BucketName, p) + } + return p + }), + } + }), + } + bs, err := json.Marshal(m) + return string(bs), err +} + +func (mr *ManifestReader) init() error { + manifest, err := mr.generateManifest() + if err != nil { + return err + } + // TODO add needed column option + + reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext) + if err != nil { + return err + } + mr.reader = reader + return nil +} + +func (mr ManifestReader) Next() (Record, error) { + rec, err := mr.reader.ReadNext() + if err != nil { + return nil, err + } + return NewSimpleArrowRecord(rec, mr.field2Col), nil +} + +func (mr ManifestReader) Close() error { + if mr.reader != nil { + return mr.reader.Close() + } + return nil +} + // ChunkedBlobsReader returns a chunk composed of blobs, or io.EOF if no more data type ChunkedBlobsReader func() ([]*Blob, error) diff --git a/internal/storagev2/packed/packed_reader_ffi.go b/internal/storagev2/packed/packed_reader_ffi.go new file mode 100644 index 0000000000..5e3f429a4a --- /dev/null +++ b/internal/storagev2/packed/packed_reader_ffi.go @@ -0,0 +1,190 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packed + +/* +#cgo pkg-config: milvus_core + +#include +#include "storage/loon_ffi/ffi_reader_c.h" +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +*/ +import "C" + +import ( + "fmt" + "io" + "unsafe" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/cdata" + + "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" + "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" +) + +func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) { + cManifest := C.CString(manifest) + defer C.free(unsafe.Pointer(cManifest)) + + var cas cdata.CArrowSchema + cdata.ExportArrowSchema(schema, &cas) + cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) + defer cdata.ReleaseCArrowSchema(&cas) + + var cPackedReader C.CFFIPackedReader + var status C.CStatus + + var pluginContextPtr *C.CPluginContext + if storagePluginContext != nil { + ckey := C.CString(storagePluginContext.EncryptionKey) + defer C.free(unsafe.Pointer(ckey)) + var pluginContext C.CPluginContext + pluginContext.ez_id = C.int64_t(storagePluginContext.EncryptionZoneId) + pluginContext.collection_id = C.int64_t(storagePluginContext.CollectionId) + pluginContext.key = ckey + pluginContextPtr = &pluginContext + } + + if storageConfig != nil { + cStorageConfig := C.CStorageConfig{ + address: C.CString(storageConfig.GetAddress()), + bucket_name: C.CString(storageConfig.GetBucketName()), + access_key_id: C.CString(storageConfig.GetAccessKeyID()), + access_key_value: C.CString(storageConfig.GetSecretAccessKey()), + root_path: C.CString(storageConfig.GetRootPath()), + storage_type: C.CString(storageConfig.GetStorageType()), + cloud_provider: C.CString(storageConfig.GetCloudProvider()), + iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()), + log_level: C.CString("Warn"), // TODO use config after storage support lower case configuration + useSSL: C.bool(storageConfig.GetUseSSL()), + sslCACert: C.CString(storageConfig.GetSslCACert()), + useIAM: C.bool(storageConfig.GetUseIAM()), + region: C.CString(storageConfig.GetRegion()), + useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()), + requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()), + gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()), + use_custom_part_upload: true, + } + defer C.free(unsafe.Pointer(cStorageConfig.address)) + defer C.free(unsafe.Pointer(cStorageConfig.bucket_name)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_id)) + defer C.free(unsafe.Pointer(cStorageConfig.access_key_value)) + defer C.free(unsafe.Pointer(cStorageConfig.root_path)) + defer C.free(unsafe.Pointer(cStorageConfig.storage_type)) + defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider)) + defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint)) + defer C.free(unsafe.Pointer(cStorageConfig.log_level)) + defer C.free(unsafe.Pointer(cStorageConfig.sslCACert)) + defer C.free(unsafe.Pointer(cStorageConfig.region)) + defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json)) + + cNeededColumn := make([]*C.char, len(neededColumns)) + for i, columnName := range neededColumns { + cNeededColumn[i] = C.CString(columnName) + defer C.free(unsafe.Pointer(cNeededColumn[i])) + } + cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0])) + cNumColumns := C.int64_t(len(neededColumns)) + + status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr) + } else { + return nil, fmt.Errorf("storageConfig is required") + } + if err := ConsumeCStatusIntoError(&status); err != nil { + return nil, err + } + + // Get the ArrowArrayStream + var cStream cdata.CArrowArrayStream + status = C.GetFFIReaderStream(cPackedReader, C.int64_t(8196), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cStream))) + if err := ConsumeCStatusIntoError(&status); err != nil { + C.CloseFFIReader(cPackedReader) + return nil, fmt.Errorf("failed to get reader stream: %w", err) + } + + // Import the stream as a RecordReader + recordReader, err := cdata.ImportCRecordReader(&cStream, schema) + if err != nil { + C.CloseFFIReader(cPackedReader) + return nil, fmt.Errorf("failed to import record reader: %w", err) + } + + return &FFIPackedReader{ + cPackedReader: cPackedReader, + recordReader: recordReader, + schema: schema, + }, nil +} + +// ReadNext reads the next record batch from the reader +func (r *FFIPackedReader) ReadNext() (arrow.Record, error) { + if r.recordReader == nil { + return nil, io.EOF + } + + // no need to manual release + // stream reader will release previous one + + // Read next record from the stream + rec, err := r.recordReader.Read() + if err != nil { + if err == io.EOF { + return nil, io.EOF + } + return nil, fmt.Errorf("failed to read next record: %w", err) + } + + return rec, nil +} + +// Close closes the FFI reader +func (r *FFIPackedReader) Close() error { + // no need to manual release current batch + // stream reader handles it + + if r.recordReader != nil { + r.recordReader = nil + } + + if r.cPackedReader != 0 { + status := C.CloseFFIReader(r.cPackedReader) + r.cPackedReader = 0 + return ConsumeCStatusIntoError(&status) + } + + return nil +} + +// Schema returns the schema of the reader +func (r *FFIPackedReader) Schema() *arrow.Schema { + return r.schema +} + +// Retain increases the reference count +func (r *FFIPackedReader) Retain() { + // if r.recordReader != nil { + // r.recordReader.Retain() + // } +} + +// Release decreases the reference count +func (r *FFIPackedReader) Release() { + r.Close() +} + +// Ensure FFIPackedReader implements array.RecordReader interface +// var _ array.RecordReader = (*FFIPackedReader)(nil) diff --git a/internal/storagev2/packed/type.go b/internal/storagev2/packed/type.go index 5ec4278771..aae7a13c1b 100644 --- a/internal/storagev2/packed/type.go +++ b/internal/storagev2/packed/type.go @@ -18,6 +18,7 @@ package packed #include #include "arrow/c/abi.h" #include "arrow/c/helpers.h" +#include "storage/loon_ffi/ffi_reader_c.h" #include "segcore/packed_reader_c.h" #include "segcore/packed_writer_c.h" */ @@ -25,6 +26,7 @@ import "C" import ( "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/arrio" "github.com/apache/arrow/go/v17/arrow/cdata" ) @@ -39,6 +41,12 @@ type PackedReader struct { currentBatch arrow.Record } +type FFIPackedReader struct { + cPackedReader C.CFFIPackedReader + recordReader arrio.Reader + schema *arrow.Schema +} + type ( // CArrowSchema is the C Data Interface for ArrowSchemas CArrowSchema = C.struct_ArrowSchema