congqixia c01fd94a6a
enhance: integrate Storage V2 FFI interface for unified storage access (#45723)
Related #44956
This commit integrates the Storage V2 FFI (Foreign Function Interface)
interface throughout the Milvus codebase, enabling unified storage
access through the Loon FFI layer. This is a significant step towards
standardizing storage operations across different storage versions.

1. Configuration Support
- **configs/milvus.yaml**: Added `useLoonFFI` configuration flag under
`common.storage.file.splitByAvgSize` section
- Allows runtime toggle between traditional binlog readers and new
FFI-based manifest readers
  - Default: `false` (maintains backward compatibility)

2. Core FFI Infrastructure

Enhanced Utilities (internal/core/src/storage/loon_ffi/util.cpp/h)
- **ToCStorageConfig()**: Converts Go's `StorageConfig` to C's
`CStorageConfig` struct for FFI calls
- **GetManifest()**: Parses manifest JSON and retrieves latest column
groups using FFI
  - Accepts manifest path with `base_path` and `ver` fields
  - Calls `get_latest_column_groups()` FFI function
  - Returns column group information as string
  - Comprehensive error handling for JSON parsing and FFI errors

3. Dependency Updates
- **internal/core/thirdparty/milvus-storage/CMakeLists.txt**:
  - Updated milvus-storage version from `0883026` to `302143c`
  - Ensures compatibility with latest FFI interfaces

4. Data Coordinator Changes

All compaction task builders now include manifest path in segment
binlogs:

- **compaction_task_clustering.go**: Added `Manifest:
segInfo.GetManifestPath()` to segment binlogs
- **compaction_task_l0.go**: Added manifest path to both L0 segment
selection and compaction plan building
- **compaction_task_mix.go**: Added manifest path to mixed compaction
segment binlogs
- **meta.go**: Updated metadata completion logic:
- `completeClusterCompactionMutation()`: Set `ManifestPath` in new
segment info
- `completeMixCompactionMutation()`: Preserve manifest path in compacted
segments
- `completeSortCompactionMutation()`: Include manifest path in sorted
segments

5. Data Node Compactor Enhancements

All compactors updated to support dual-mode reading (binlog vs
manifest):

6. Flush & Sync Manager Updates

Pack Writer V2 (pack_writer_v2.go)
- **BulkPackWriterV2.Write()**: Extended return signature to include
`manifest string`
- Implementation:
  - Generate manifest path: `path.Join(pack.segmentID, "manifest.json")`
  - Write packed data using FFI-based writer
  - Return manifest path along with binlogs, deltas, and stats

Task Handling (task.go)
- Updated all sync task result handling to accommodate new manifest
return value
- Ensured backward compatibility for callers not using manifest

7. Go Storage Layer Integration

New Interfaces and Implementations
- **record_reader.go**: Interface for unified record reading across
storage versions
- **record_writer.go**: Interface for unified record writing across
storage versions
- **binlog_record_writer.go**: Concrete implementation for traditional
binlog-based writing

Enhanced Schema Support (schema.go, schema_test.go)
- Schema conversion utilities to support FFI-based storage operations
- Ensures proper Arrow schema mapping for V2 storage

Serialization Updates
- **serde.go, serde_events.go, serde_events_v2.go**: Updated to work
with new reader/writer interfaces
- Test files updated to validate dual-mode serialization

8. Storage V2 Packed Format

FFI Common (storagev2/packed/ffi_common.go)
- Common FFI utilities and type conversions for packed storage format

Packed Writer FFI (storagev2/packed/packed_writer_ffi.go)
- FFI-based implementation of packed writer
- Integrates with Loon storage layer for efficient columnar writes

Packed Reader FFI (storagev2/packed/packed_reader_ffi.go)
- Already existed, now complemented by writer implementation

9. Protocol Buffer Updates

data_coord.proto & datapb/data_coord.pb.go
- Added `manifest` field to compaction segment messages
- Enables passing manifest metadata through compaction pipeline

worker.proto & workerpb/worker.pb.go
- Added compaction parameter for `useLoonFFI` flag
- Allows workers to receive FFI configuration from coordinator

10. Parameter Configuration

component_param.go
- Added `UseLoonFFI` parameter to compaction configuration
- Reads from `common.storage.file.useLoonFFI` config path
- Default: `false` for safe rollout

11. Test Updates
- **clustering_compactor_storage_v2_test.go**: Updated signatures to
handle manifest return value
- **mix_compactor_storage_v2_test.go**: Updated test helpers for
manifest support
- **namespace_compactor_test.go**: Adjusted writer calls to expect
manifest
- **pack_writer_v2_test.go**: Validated manifest generation in pack
writing

This integration follows a **dual-mode approach**:
1. **Legacy Path**: Traditional binlog-based reading/writing (when
`useLoonFFI=false` or no manifest)
2. **FFI Path**: Manifest-based reading/writing through Loon FFI (when
`useLoonFFI=true` and manifest exists)

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-11-24 19:57:07 +08:00

216 lines
6.2 KiB
Go

package packed
/*
#cgo pkg-config: milvus_core milvus-storage
#include <stdlib.h>
#include "milvus-storage/ffi_c.h"
#include "arrow/c/abi.h"
#include "arrow/c/helpers.h"
*/
import "C"
import (
"encoding/json"
"fmt"
"strconv"
"unsafe"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
// Property keys - matching milvus-storage/properties.h
const (
PropertyFSAddress = "fs.address"
PropertyFSBucketName = "fs.bucket_name"
PropertyFSAccessKeyID = "fs.access_key_id"
PropertyFSAccessKeyValue = "fs.access_key_value"
PropertyFSRootPath = "fs.root_path"
PropertyFSStorageType = "fs.storage_type"
PropertyFSCloudProvider = "fs.cloud_provider"
PropertyFSIAMEndpoint = "fs.iam_endpoint"
PropertyFSLogLevel = "fs.log_level"
PropertyFSRegion = "fs.region"
PropertyFSUseSSL = "fs.use_ssl"
PropertyFSSSLCACert = "fs.ssl_ca_cert"
PropertyFSUseIAM = "fs.use_iam"
PropertyFSUseVirtualHost = "fs.use_virtual_host"
PropertyFSRequestTimeoutMS = "fs.request_timeout_ms"
PropertyFSGCPCredentialJSON = "fs.gcp_credential_json"
PropertyFSUseCustomPartUpload = "fs.use_custom_part_upload"
PropertyWriterPolicy = "writer.policy"
PropertyWriterSchemaBasedPattern = "writer.split.schema_based.patterns"
)
// MakePropertiesFromStorageConfig creates a Properties object from StorageConfig
// This function converts a StorageConfig structure into a Properties object by
// calling the FFI properties_create function. All configuration fields from
// StorageConfig are mapped to corresponding key-value pairs in Properties.
func MakePropertiesFromStorageConfig(storageConfig *indexpb.StorageConfig, extraKVs map[string]string) (*C.Properties, error) {
if storageConfig == nil {
return nil, fmt.Errorf("storageConfig is required")
}
// Prepare key-value pairs from StorageConfig
var keys []string
var values []string
// Add non-empty string fields
if storageConfig.GetAddress() != "" {
keys = append(keys, PropertyFSAddress)
values = append(values, storageConfig.GetAddress())
}
if storageConfig.GetBucketName() != "" {
keys = append(keys, PropertyFSBucketName)
values = append(values, storageConfig.GetBucketName())
}
if storageConfig.GetAccessKeyID() != "" {
keys = append(keys, PropertyFSAccessKeyID)
values = append(values, storageConfig.GetAccessKeyID())
}
if storageConfig.GetSecretAccessKey() != "" {
keys = append(keys, PropertyFSAccessKeyValue)
values = append(values, storageConfig.GetSecretAccessKey())
}
if storageConfig.GetRootPath() != "" {
keys = append(keys, PropertyFSRootPath)
values = append(values, storageConfig.GetRootPath())
}
if storageConfig.GetStorageType() != "" {
keys = append(keys, PropertyFSStorageType)
values = append(values, storageConfig.GetStorageType())
}
if storageConfig.GetCloudProvider() != "" {
keys = append(keys, PropertyFSCloudProvider)
values = append(values, storageConfig.GetCloudProvider())
}
if storageConfig.GetIAMEndpoint() != "" {
keys = append(keys, PropertyFSIAMEndpoint)
values = append(values, storageConfig.GetIAMEndpoint())
}
// Always add log level if any string field is set (matching C++ behavior)
keys = append(keys, PropertyFSLogLevel)
values = append(values, "Warn")
if storageConfig.GetRegion() != "" {
keys = append(keys, PropertyFSRegion)
values = append(values, storageConfig.GetRegion())
}
if storageConfig.GetSslCACert() != "" {
keys = append(keys, PropertyFSSSLCACert)
values = append(values, storageConfig.GetSslCACert())
}
if storageConfig.GetGcpCredentialJSON() != "" {
keys = append(keys, PropertyFSGCPCredentialJSON)
values = append(values, storageConfig.GetGcpCredentialJSON())
}
// Add boolean fields
keys = append(keys, PropertyFSUseSSL)
if storageConfig.GetUseSSL() {
values = append(values, "true")
} else {
values = append(values, "false")
}
keys = append(keys, PropertyFSUseIAM)
if storageConfig.GetUseIAM() {
values = append(values, "true")
} else {
values = append(values, "false")
}
keys = append(keys, PropertyFSUseVirtualHost)
if storageConfig.GetUseVirtualHost() {
values = append(values, "true")
} else {
values = append(values, "false")
}
keys = append(keys, PropertyFSUseCustomPartUpload)
values = append(values, "true") // hardcoded to true as in the original code
// Add integer field
keys = append(keys, PropertyFSRequestTimeoutMS)
values = append(values, strconv.FormatInt(storageConfig.GetRequestTimeoutMs(), 10))
// Add extra kvs
for k, v := range extraKVs {
keys = append(keys, k)
values = append(values, v)
}
// Convert to C arrays
cKeys := make([]*C.char, len(keys))
cValues := make([]*C.char, len(values))
for i := range keys {
cKeys[i] = C.CString(keys[i])
cValues[i] = C.CString(values[i])
}
// Defer cleanup of all C strings
defer func() {
for i := range cKeys {
C.free(unsafe.Pointer(cKeys[i]))
C.free(unsafe.Pointer(cValues[i]))
}
}()
// Create Properties using FFI
properties := &C.Properties{}
var cKeysPtr **C.char
var cValuesPtr **C.char
if len(cKeys) > 0 {
cKeysPtr = &cKeys[0]
cValuesPtr = &cValues[0]
}
result := C.properties_create(
(**C.char)(unsafe.Pointer(cKeysPtr)),
(**C.char)(unsafe.Pointer(cValuesPtr)),
C.size_t(len(keys)),
properties,
)
err := HandleFFIResult(result)
if err != nil {
return nil, err
}
return properties, nil
}
func HandleFFIResult(ffiResult C.FFIResult) error {
defer C.FreeFFIResult(&ffiResult)
if C.IsSuccess(&ffiResult) == 0 {
errMsg := C.GetErrorMessage(&ffiResult)
errStr := "Unknown error"
if errMsg != nil {
errStr = C.GoString(errMsg)
}
return fmt.Errorf("failed to create properties: %s", errStr)
}
return nil
}
type ManifestJSON struct {
ManifestVersion int64 `json:"ver"`
BasePath string `json:"base_path"`
}
func MarshalManifestPath(basePath string, version int64) string {
bs, _ := json.Marshal(ManifestJSON{
ManifestVersion: version,
BasePath: basePath,
})
return string(bs)
}
func UnmarshalManfestPath(manifestPath string) (string, int64, error) {
var manifestJSON ManifestJSON
err := json.Unmarshal([]byte(manifestPath), &manifestJSON)
if err != nil {
return "", 0, err
}
return manifestJSON.BasePath, manifestJSON.ManifestVersion, nil
}