milvus/internal/storagev2/packed/packed_reader_ffi.go
congqixia d2e4278b18
enhance: use milvus-storage internal C++ Reader API for Loon FFI (#45897)
This PR refactors the Loon FFI reader implementation to use
milvus-storage's internal C++ Reader API directly instead of the
external FFI interface.

Key changes:
- Replace external FFI calls (get_record_batch_reader, reader_destroy)
with direct C++ Reader API calls
- Add GetLoonReader() helper function to create Reader instances using
milvus-storage::api::Reader::create()
- Use MakeInternalPropertiesFromStorageConfig() instead of
MakePropertiesFromStorageConfig() to get internal properties
- Update NewPackedFFIReaderWithManifest() to deserialize column groups
from JSON manifest content directly
- Simplify GetFFIReaderStream() to use Reader::get_record_batch_reader()
and arrow::ExportRecordBatchReader() for Arrow stream export
- Change CFFIPackedReader typedef from ReaderHandle to void* for
flexibility
- Update milvus-storage dependency version to ba7df7b

This change improves code maintainability by using the native C++ API
directly and eliminates the overhead of going through the external FFI
layer.

issue: #44956

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-11-28 18:55:07 +08:00

215 lines
7.2 KiB
Go

// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package packed
/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include "storage/loon_ffi/ffi_reader_c.h"
#include "arrow/c/abi.h"
#include "arrow/c/helpers.h"
*/
import "C"
import (
"fmt"
"io"
"unsafe"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cManifest := C.CString(manifest)
defer C.free(unsafe.Pointer(cManifest))
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
defer cdata.ReleaseCArrowSchema(&cas)
var cPackedReader C.CFFIPackedReader
var status C.CStatus
var pluginContextPtr *C.CPluginContext
if storagePluginContext != nil {
ckey := C.CString(storagePluginContext.EncryptionKey)
defer C.free(unsafe.Pointer(ckey))
var pluginContext C.CPluginContext
pluginContext.ez_id = C.int64_t(storagePluginContext.EncryptionZoneId)
pluginContext.collection_id = C.int64_t(storagePluginContext.CollectionId)
pluginContext.key = ckey
pluginContextPtr = &pluginContext
}
if storageConfig != nil {
cStorageConfig := C.CStorageConfig{
address: C.CString(storageConfig.GetAddress()),
bucket_name: C.CString(storageConfig.GetBucketName()),
access_key_id: C.CString(storageConfig.GetAccessKeyID()),
access_key_value: C.CString(storageConfig.GetSecretAccessKey()),
root_path: C.CString(storageConfig.GetRootPath()),
storage_type: C.CString(storageConfig.GetStorageType()),
cloud_provider: C.CString(storageConfig.GetCloudProvider()),
iam_endpoint: C.CString(storageConfig.GetIAMEndpoint()),
log_level: C.CString("Warn"), // TODO use config after storage support lower case configuration
useSSL: C.bool(storageConfig.GetUseSSL()),
sslCACert: C.CString(storageConfig.GetSslCACert()),
useIAM: C.bool(storageConfig.GetUseIAM()),
region: C.CString(storageConfig.GetRegion()),
useVirtualHost: C.bool(storageConfig.GetUseVirtualHost()),
requestTimeoutMs: C.int64_t(storageConfig.GetRequestTimeoutMs()),
gcp_credential_json: C.CString(storageConfig.GetGcpCredentialJSON()),
use_custom_part_upload: true,
max_connections: C.uint32_t(storageConfig.GetMaxConnections()),
}
defer C.free(unsafe.Pointer(cStorageConfig.address))
defer C.free(unsafe.Pointer(cStorageConfig.bucket_name))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_id))
defer C.free(unsafe.Pointer(cStorageConfig.access_key_value))
defer C.free(unsafe.Pointer(cStorageConfig.root_path))
defer C.free(unsafe.Pointer(cStorageConfig.storage_type))
defer C.free(unsafe.Pointer(cStorageConfig.cloud_provider))
defer C.free(unsafe.Pointer(cStorageConfig.iam_endpoint))
defer C.free(unsafe.Pointer(cStorageConfig.log_level))
defer C.free(unsafe.Pointer(cStorageConfig.sslCACert))
defer C.free(unsafe.Pointer(cStorageConfig.region))
defer C.free(unsafe.Pointer(cStorageConfig.gcp_credential_json))
cNeededColumn := make([]*C.char, len(neededColumns))
for i, columnName := range neededColumns {
cNeededColumn[i] = C.CString(columnName)
defer C.free(unsafe.Pointer(cNeededColumn[i]))
}
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
cNumColumns := C.int64_t(len(neededColumns))
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
} else {
return nil, fmt.Errorf("storageConfig is required")
}
if err := ConsumeCStatusIntoError(&status); err != nil {
return nil, err
}
// Get the ArrowArrayStream
var cStream cdata.CArrowArrayStream
status = C.GetFFIReaderStream(cPackedReader, C.int64_t(8196), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cStream)))
if err := ConsumeCStatusIntoError(&status); err != nil {
C.CloseFFIReader(cPackedReader)
return nil, fmt.Errorf("failed to get reader stream: %w", err)
}
// Import the stream as a RecordReader
recordReader, err := cdata.ImportCRecordReader(&cStream, schema)
if err != nil {
C.CloseFFIReader(cPackedReader)
return nil, fmt.Errorf("failed to import record reader: %w", err)
}
return &FFIPackedReader{
cPackedReader: cPackedReader,
recordReader: recordReader,
schema: schema,
}, nil
}
// ReadNext reads the next record batch from the reader
func (r *FFIPackedReader) ReadNext() (arrow.Record, error) {
if r.recordReader == nil {
return nil, io.EOF
}
// no need to manual release
// stream reader will release previous one
// Read next record from the stream
rec, err := r.recordReader.Read()
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, fmt.Errorf("failed to read next record: %w", err)
}
return rec, nil
}
// Close closes the FFI reader
func (r *FFIPackedReader) Close() error {
// no need to manual release current batch
// stream reader handles it
if r.recordReader != nil {
r.recordReader = nil
}
status := C.CloseFFIReader(r.cPackedReader)
return ConsumeCStatusIntoError(&status)
}
// Schema returns the schema of the reader
func (r *FFIPackedReader) Schema() *arrow.Schema {
return r.schema
}
// Retain increases the reference count
func (r *FFIPackedReader) Retain() {
// if r.recordReader != nil {
// r.recordReader.Retain()
// }
}
// Release decreases the reference count
func (r *FFIPackedReader) Release() {
r.Close()
}
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) {
basePath, version, err := UnmarshalManfestPath(manifestPath)
if err != nil {
return "", err
}
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
if err != nil {
return "", err
}
cBasePath := C.CString(basePath)
defer C.free(unsafe.Pointer(cBasePath))
var cManifest *C.char
var cVersion C.int64_t
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion)
err = HandleFFIResult(result)
if err != nil {
return "", err
}
manifest = C.GoString(cManifest)
return manifest, nil
}
// Ensure FFIPackedReader implements array.RecordReader interface
// var _ array.RecordReader = (*FFIPackedReader)(nil)