From d4450b2f57f9fc291bf9566c821851876b582387 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 5 Dec 2025 17:59:12 +0800 Subject: [PATCH] enhance: [StorageV2] Integrate CMEK support into Loon FFI interface (#46123) This PR adds Customer Managed Encryption Keys (CMEK) support to the StorageV2 FFI layer, enabling data encryption/decryption through the cipher plugin system. Changes: - Add ffi_writer_c.cpp/h with GetEncParams() to retrieve encryption parameters (key and metadata) from cipher plugin for data encryption - Extend GetLoonReader() in ffi_reader_c.cpp to support CMEK decryption by configuring KeyRetriever when plugin context is provided - Add encryption property constants in ffi_common.go for writer config - Integrate CMEK encryption in NewFFIPackedWriter() to pass encryption parameters to the underlying storage writer issue: #44956 Signed-off-by: Congqi Xia --- .../core/src/storage/loon_ffi/CMakeLists.txt | 1 + .../src/storage/loon_ffi/ffi_reader_c.cpp | 75 +++++++++++-------- .../src/storage/loon_ffi/ffi_writer_c.cpp | 61 +++++++++++++++ .../core/src/storage/loon_ffi/ffi_writer_c.h | 35 +++++++++ internal/storagev2/packed/ffi_common.go | 6 ++ .../storagev2/packed/packed_writer_ffi.go | 36 ++++++++- 6 files changed, 182 insertions(+), 32 deletions(-) diff --git a/internal/core/src/storage/loon_ffi/CMakeLists.txt b/internal/core/src/storage/loon_ffi/CMakeLists.txt index 3370e33a64..7c055c384a 100644 --- a/internal/core/src/storage/loon_ffi/CMakeLists.txt +++ b/internal/core/src/storage/loon_ffi/CMakeLists.txt @@ -12,6 +12,7 @@ # FFI Reader source files for interfacing with milvus-storage through FFI set(FFI_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/ffi_reader_c.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ffi_writer_c.cpp ${CMAKE_CURRENT_SOURCE_DIR}/util.cpp ) diff --git a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp index b7a09f9918..0ce75609a2 100644 --- a/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp +++ b/internal/core/src/storage/loon_ffi/ffi_reader_c.cpp @@ -19,50 +19,63 @@ #include "milvus-storage/ffi_c.h" #include "milvus-storage/reader.h" #include "storage/loon_ffi/util.h" +#include "storage/PluginLoader.h" +#include "storage/KeyRetriever.h" #include "monitor/scope_metric.h" -ReaderHandle -createFFIReader(ColumnGroupsHandle column_groups_handle, - struct ArrowSchema* schema, - char** needed_columns, - int64_t needed_columns_size, - const std::shared_ptr& properties) { - ReaderHandle reader_handler = 0; - - FFIResult result = reader_new(column_groups_handle, - schema, - needed_columns, - needed_columns_size, - properties.get(), - &reader_handler); - if (!IsSuccess(&result)) { - auto message = GetErrorMessage(&result); - // Copy the error message before freeing the FFIResult - std::string error_msg = message ? message : "Unknown error"; - FreeFFIResult(&result); - throw std::runtime_error(error_msg); - } - - FreeFFIResult(&result); - return reader_handler; -} - +/** + * @brief Creates a Loon reader with optional CMEK decryption support. + * + * This function creates a milvus_storage Reader instance and optionally configures + * it with a key retriever for decrypting encrypted data when CMEK is enabled. + * + * @param[in] column_groups Shared pointer to the column groups to read + * @param[in] schema Arrow schema defining the data structure + * @param[in] needed_columns Array of column names to read + * @param[in] needed_columns_size Number of columns in needed_columns array + * @param[in] properties Storage properties for reader configuration + * @param[in] c_plugin_context Optional plugin context for CMEK decryption. + * If non-null, configures the reader with a key retriever + * that can decrypt data encrypted with CMEK. + * + * @return Unique pointer to the created Reader instance + * + * @throws AssertException if schema import fails or cipher plugin is null when required + */ std::unique_ptr GetLoonReader( std::shared_ptr column_groups, struct ArrowSchema* schema, char** needed_columns, int64_t needed_columns_size, - const std::shared_ptr& properties) { + const std::shared_ptr& properties, + CPluginContext* c_plugin_context) { auto result = arrow::ImportSchema(schema); AssertInfo(result.ok(), "Import arrow schema failed"); auto arrow_schema = result.ValueOrDie(); - return milvus_storage::api::Reader::create( + auto reader = milvus_storage::api::Reader::create( column_groups, arrow_schema, std::make_shared>( needed_columns, needed_columns + needed_columns_size), *properties); + + // Configure CMEK decryption if plugin context is provided + if (c_plugin_context != nullptr) { + auto plugin_ptr = + milvus::storage::PluginLoader::GetInstance().getCipherPlugin(); + AssertInfo(plugin_ptr != nullptr, "cipher plugin is nullptr"); + plugin_ptr->Update(c_plugin_context->ez_id, + c_plugin_context->collection_id, + std::string(c_plugin_context->key)); + auto key_retriever = std::make_shared(); + reader->set_keyretriever( + [key_retriever](const std::string& metadata) -> std::string { + return key_retriever->GetKey(metadata); + }); + } + + return reader; } CStatus @@ -87,7 +100,8 @@ NewPackedFFIReader(const char* manifest_path, schema, needed_columns, needed_columns_size, - properties); + properties, + c_plugin_context); *c_packed_reader = static_cast(reader.release()); return milvus::SuccessCStatus(); @@ -118,7 +132,8 @@ NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle, schema, needed_columns, needed_columns_size, - properties); + properties, + c_plugin_context); *c_loon_reader = static_cast(reader.release()); return milvus::SuccessCStatus(); diff --git a/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp b/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp index e69de29bb2..1924a9cdbd 100644 --- a/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp +++ b/internal/core/src/storage/loon_ffi/ffi_writer_c.cpp @@ -0,0 +1,61 @@ +// Copyright 2023 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common/EasyAssert.h" +#include "storage/loon_ffi/ffi_writer_c.h" +#include "common/common_type_c.h" +#include "storage/PluginLoader.h" +#include "storage/KeyRetriever.h" +#include "monitor/scope_metric.h" + +/** + * @brief Implementation of GetEncParams - retrieves encryption parameters for CMEK. + * + * @details This function performs the following steps: + * 1. Loads the cipher plugin from PluginLoader singleton + * 2. Updates the plugin with encryption zone ID, collection ID, and key + * 3. Retrieves the encryptor for the given zone and collection + * 4. Encodes key metadata containing zone ID, collection ID, and key version + * 5. Returns the encryption key and metadata as newly allocated strings + * + * @see GetEncParams declaration in ffi_writer_c.h for parameter documentation + */ +CStatus +GetEncParams(CPluginContext* c_plugin_context, + char** out_key, + char** out_meta) { + try { + AssertInfo(c_plugin_context != nullptr, "c_plugin_context is nullptr"); + auto plugin_ptr = + milvus::storage::PluginLoader::GetInstance().getCipherPlugin(); + AssertInfo(plugin_ptr != nullptr, "plugin_ptr is nullptr"); + + plugin_ptr->Update(c_plugin_context->ez_id, + c_plugin_context->collection_id, + std::string(c_plugin_context->key)); + auto got = plugin_ptr->GetEncryptor(c_plugin_context->ez_id, + c_plugin_context->collection_id); + auto metadata = + milvus::storage::EncodeKeyMetadata(c_plugin_context->ez_id, + c_plugin_context->collection_id, + got.second); + *out_key = strdup(got.first->GetKey().c_str()); + *out_meta = strdup(metadata.c_str()); + return milvus::SuccessCStatus(); + + } catch (std::exception& e) { + return milvus::FailureCStatus(milvus::ErrorCode::UnexpectedError, + e.what()); + } +} \ No newline at end of file diff --git a/internal/core/src/storage/loon_ffi/ffi_writer_c.h b/internal/core/src/storage/loon_ffi/ffi_writer_c.h index bcfa1e5354..8bcb1f24a1 100644 --- a/internal/core/src/storage/loon_ffi/ffi_writer_c.h +++ b/internal/core/src/storage/loon_ffi/ffi_writer_c.h @@ -11,3 +11,38 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common/common_type_c.h" +#include "common/type_c.h" +#include +#include "milvus-storage/ffi_c.h" + +/** + * @brief Retrieves encryption parameters from the cipher plugin for CMEK (Customer Managed Encryption Keys). + * + * This function loads the cipher plugin, updates it with the provided plugin context, + * and retrieves the encryption key and metadata required for encrypting data in storage. + * + * @param[in] c_plugin_context Pointer to the plugin context containing: + * - ez_id: Encryption zone ID + * - collection_id: The collection ID + * - key: The encryption key string + * @param[out] out_key Pointer to receive the encryption key (caller must free with free()) + * @param[out] out_meta Pointer to receive the encoded key metadata (caller must free with free()) + * + * @return CStatus Success status or error with message if failed + * + * @note The caller is responsible for freeing the allocated out_key and out_meta strings. + */ +CStatus +GetEncParams(CPluginContext* c_plugin_context, char** out_key, char** out_meta); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/internal/storagev2/packed/ffi_common.go b/internal/storagev2/packed/ffi_common.go index 8bb3bafbcb..56112e7a53 100644 --- a/internal/storagev2/packed/ffi_common.go +++ b/internal/storagev2/packed/ffi_common.go @@ -40,6 +40,12 @@ const ( PropertyWriterPolicy = "writer.policy" PropertyWriterSchemaBasedPattern = "writer.split.schema_based.patterns" + + // CMEK (Customer Managed Encryption Keys) writer properties + PropertyWriterEncEnable = "writer.enc.enable" // Enable encryption for written data + PropertyWriterEncKey = "writer.enc.key" // Encryption key for data encryption + PropertyWriterEncMeta = "writer.enc.meta" // Encoded metadata containing zone ID, collection ID, and key version + PropertyWriterEncAlgo = "writer.enc.algorithm" // Encryption algorithm (e.g., "AES_GCM_V1") ) // MakePropertiesFromStorageConfig creates a Properties object from StorageConfig diff --git a/internal/storagev2/packed/packed_writer_ffi.go b/internal/storagev2/packed/packed_writer_ffi.go index 835cb88a6d..2ac06e2ac2 100644 --- a/internal/storagev2/packed/packed_writer_ffi.go +++ b/internal/storagev2/packed/packed_writer_ffi.go @@ -21,6 +21,7 @@ package packed #include "milvus-storage/ffi_c.h" #include "segcore/packed_writer_c.h" #include "segcore/column_groups_c.h" +#include "storage/loon_ffi/ffi_writer_c.h" #include "arrow/c/abi.h" #include "arrow/c/helpers.h" */ @@ -92,10 +93,41 @@ func NewFFIPackedWriter(basePath string, schema *arrow.Schema, columnGroups []st }), "|") }), ",") - cProperties, err := MakePropertiesFromStorageConfig(storageConfig, map[string]string{ + extra := map[string]string{ PropertyWriterPolicy: "schema_based", PropertyWriterSchemaBasedPattern: pattern, - }) + } + + // Configure CMEK encryption if plugin context is provided + if storagePluginContext != nil { + var cKey *C.char + var cMeta *C.char + + encKey := C.CString(storagePluginContext.EncryptionKey) + defer C.free(unsafe.Pointer(encKey)) + + // Prepare plugin context for FFI call to retrieve encryption parameters + var pluginContext C.CPluginContext + pluginContext.ez_id = C.int64_t(storagePluginContext.EncryptionZoneId) + pluginContext.collection_id = C.int64_t(storagePluginContext.CollectionId) + pluginContext.key = encKey + + // Get encryption key and metadata from cipher plugin via FFI + status := C.GetEncParams(&pluginContext, &cKey, &cMeta) + if err := ConsumeCStatusIntoError(&status); err != nil { + return nil, err + } + + // Set encryption properties for the writer + extra[PropertyWriterEncEnable] = "true" + extra[PropertyWriterEncKey] = C.GoString(cKey) + C.free(unsafe.Pointer(cKey)) + extra[PropertyWriterEncMeta] = C.GoString(cMeta) + C.free(unsafe.Pointer(cMeta)) + extra[PropertyWriterEncAlgo] = "AES_GCM_V1" + } + + cProperties, err := MakePropertiesFromStorageConfig(storageConfig, extra) if err != nil { return nil, err }