milvus/internal/storage/record_reader.go
congqixia c01fd94a6a
enhance: integrate Storage V2 FFI interface for unified storage access (#45723)
Related #44956
This commit integrates the Storage V2 FFI (Foreign Function Interface)
interface throughout the Milvus codebase, enabling unified storage
access through the Loon FFI layer. This is a significant step towards
standardizing storage operations across different storage versions.

1. Configuration Support
- **configs/milvus.yaml**: Added `useLoonFFI` configuration flag under
`common.storage.file.splitByAvgSize` section
- Allows runtime toggle between traditional binlog readers and new
FFI-based manifest readers
  - Default: `false` (maintains backward compatibility)

2. Core FFI Infrastructure

Enhanced Utilities (internal/core/src/storage/loon_ffi/util.cpp/h)
- **ToCStorageConfig()**: Converts Go's `StorageConfig` to C's
`CStorageConfig` struct for FFI calls
- **GetManifest()**: Parses manifest JSON and retrieves latest column
groups using FFI
  - Accepts manifest path with `base_path` and `ver` fields
  - Calls `get_latest_column_groups()` FFI function
  - Returns column group information as string
  - Comprehensive error handling for JSON parsing and FFI errors

3. Dependency Updates
- **internal/core/thirdparty/milvus-storage/CMakeLists.txt**:
  - Updated milvus-storage version from `0883026` to `302143c`
  - Ensures compatibility with latest FFI interfaces

4. Data Coordinator Changes

All compaction task builders now include manifest path in segment
binlogs:

- **compaction_task_clustering.go**: Added `Manifest:
segInfo.GetManifestPath()` to segment binlogs
- **compaction_task_l0.go**: Added manifest path to both L0 segment
selection and compaction plan building
- **compaction_task_mix.go**: Added manifest path to mixed compaction
segment binlogs
- **meta.go**: Updated metadata completion logic:
- `completeClusterCompactionMutation()`: Set `ManifestPath` in new
segment info
- `completeMixCompactionMutation()`: Preserve manifest path in compacted
segments
- `completeSortCompactionMutation()`: Include manifest path in sorted
segments

5. Data Node Compactor Enhancements

All compactors updated to support dual-mode reading (binlog vs
manifest):

6. Flush & Sync Manager Updates

Pack Writer V2 (pack_writer_v2.go)
- **BulkPackWriterV2.Write()**: Extended return signature to include
`manifest string`
- Implementation:
  - Generate manifest path: `path.Join(pack.segmentID, "manifest.json")`
  - Write packed data using FFI-based writer
  - Return manifest path along with binlogs, deltas, and stats

Task Handling (task.go)
- Updated all sync task result handling to accommodate new manifest
return value
- Ensured backward compatibility for callers not using manifest

7. Go Storage Layer Integration

New Interfaces and Implementations
- **record_reader.go**: Interface for unified record reading across
storage versions
- **record_writer.go**: Interface for unified record writing across
storage versions
- **binlog_record_writer.go**: Concrete implementation for traditional
binlog-based writing

Enhanced Schema Support (schema.go, schema_test.go)
- Schema conversion utilities to support FFI-based storage operations
- Ensures proper Arrow schema mapping for V2 storage

Serialization Updates
- **serde.go, serde_events.go, serde_events_v2.go**: Updated to work
with new reader/writer interfaces
- Test files updated to validate dual-mode serialization

8. Storage V2 Packed Format

FFI Common (storagev2/packed/ffi_common.go)
- Common FFI utilities and type conversions for packed storage format

Packed Writer FFI (storagev2/packed/packed_writer_ffi.go)
- FFI-based implementation of packed writer
- Integrates with Loon storage layer for efficient columnar writes

Packed Reader FFI (storagev2/packed/packed_reader_ffi.go)
- Already existed, now complemented by writer implementation

9. Protocol Buffer Updates

data_coord.proto & datapb/data_coord.pb.go
- Added `manifest` field to compaction segment messages
- Enables passing manifest metadata through compaction pipeline

worker.proto & workerpb/worker.pb.go
- Added compaction parameter for `useLoonFFI` flag
- Allows workers to receive FFI configuration from coordinator

10. Parameter Configuration

component_param.go
- Added `UseLoonFFI` parameter to compaction configuration
- Reads from `common.storage.file.useLoonFFI` config path
- Default: `false` for safe rollout

11. Test Updates
- **clustering_compactor_storage_v2_test.go**: Updated signatures to
handle manifest return value
- **mix_compactor_storage_v2_test.go**: Updated test helpers for
manifest support
- **namespace_compactor_test.go**: Adjusted writer calls to expect
manifest
- **pack_writer_v2_test.go**: Validated manifest generation in pack
writing

This integration follows a **dual-mode approach**:
1. **Legacy Path**: Traditional binlog-based reading/writing (when
`useLoonFFI=false` or no manifest)
2. **FFI Path**: Manifest-based reading/writing through Loon FFI (when
`useLoonFFI=true` and manifest exists)

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-11-24 19:57:07 +08:00

335 lines
8.4 KiB
Go

package storage
import (
"fmt"
"io"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type RecordReader interface {
Next() (Record, error)
Close() error
}
type packedRecordReader struct {
reader *packed.PackedReader
field2Col map[FieldID]int
}
var _ RecordReader = (*packedRecordReader)(nil)
func (pr *packedRecordReader) Next() (Record, error) {
rec, err := pr.reader.ReadNext()
if err != nil {
return nil, err
}
return NewSimpleArrowRecord(rec, pr.field2Col), nil
}
func (pr *packedRecordReader) Close() error {
if pr.reader != nil {
return pr.reader.Close()
}
return nil
}
func newPackedRecordReader(
paths []string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*packedRecordReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, true)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
for i, field := range allFields {
field2Col[field.FieldID] = i
}
reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize, storageConfig, storagePluginContext)
if err != nil {
return nil, err
}
return &packedRecordReader{
reader: reader,
field2Col: field2Col,
}, nil
}
func NewRecordReaderFromManifest(manifest string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (RecordReader, error) {
return NewManifestReader(manifest, schema, bufferSize, storageConfig, storagePluginContext)
}
var _ RecordReader = (*IterativeRecordReader)(nil)
type IterativeRecordReader struct {
cur RecordReader
iterate func() (RecordReader, error)
}
// Close implements RecordReader.
func (ir *IterativeRecordReader) Close() error {
if ir.cur != nil {
return ir.cur.Close()
}
return nil
}
func (ir *IterativeRecordReader) Next() (Record, error) {
if ir.cur == nil {
r, err := ir.iterate()
if err != nil {
return nil, err
}
ir.cur = r
}
rec, err := ir.cur.Next()
if err == io.EOF {
closeErr := ir.cur.Close()
if closeErr != nil {
return nil, closeErr
}
ir.cur, err = ir.iterate()
if err != nil {
return nil, err
}
rec, err = ir.cur.Next()
}
return rec, err
}
func newIterativePackedRecordReader(
paths [][]string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) *IterativeRecordReader {
chunk := 0
return &IterativeRecordReader{
iterate: func() (RecordReader, error) {
if chunk >= len(paths) {
return nil, io.EOF
}
currentPaths := paths[chunk]
chunk++
return newPackedRecordReader(currentPaths, schema, bufferSize, storageConfig, storagePluginContext)
},
}
}
type ManifestReader struct {
fieldBinlogs []*datapb.FieldBinlog
manifest string
reader *packed.FFIPackedReader
bufferSize int64
arrowSchema *arrow.Schema
schema *schemapb.CollectionSchema
schemaHelper *typeutil.SchemaHelper
field2Col map[FieldID]int
storageConfig *indexpb.StorageConfig
storagePluginContext *indexcgopb.StoragePluginContext
neededColumns []string
}
// NewManifestReaderFromBinlogs creates a ManifestReader from binlogs
func NewManifestReaderFromBinlogs(fieldBinlogs []*datapb.FieldBinlog,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*ManifestReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, false)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return nil, err
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
neededColumns := make([]string, 0, len(allFields))
for i, field := range allFields {
field2Col[field.FieldID] = i
neededColumns = append(neededColumns, field.Name)
}
prr := &ManifestReader{
fieldBinlogs: fieldBinlogs,
bufferSize: bufferSize,
arrowSchema: arrowSchema,
schema: schema,
schemaHelper: schemaHelper,
field2Col: field2Col,
storageConfig: storageConfig,
storagePluginContext: storagePluginContext,
neededColumns: neededColumns,
}
err = prr.init()
if err != nil {
return nil, err
}
return prr, nil
}
func NewManifestReader(manifest string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*ManifestReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, true)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return nil, err
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
neededColumns := make([]string, 0, len(allFields))
for i, field := range allFields {
field2Col[field.FieldID] = i
neededColumns = append(neededColumns, field.Name)
}
prr := &ManifestReader{
manifest: manifest,
bufferSize: bufferSize,
arrowSchema: arrowSchema,
schema: schema,
schemaHelper: schemaHelper,
field2Col: field2Col,
storageConfig: storageConfig,
storagePluginContext: storagePluginContext,
neededColumns: neededColumns,
}
err = prr.init()
if err != nil {
return nil, err
}
return prr, nil
}
func (mr *ManifestReader) init() error {
// TODO add needed column option
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
if err != nil {
return err
}
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
if err != nil {
return err
}
mr.reader = reader
return nil
}
func (mr ManifestReader) Next() (Record, error) {
rec, err := mr.reader.ReadNext()
if err != nil {
return nil, err
}
return NewSimpleArrowRecord(rec, mr.field2Col), nil
}
func (mr ManifestReader) Close() error {
if mr.reader != nil {
return mr.reader.Close()
}
return nil
}
// ChunkedBlobsReader returns a chunk composed of blobs, or io.EOF if no more data
type ChunkedBlobsReader func() ([]*Blob, error)
type CompositeBinlogRecordReader struct {
fields map[FieldID]*schemapb.FieldSchema
index map[FieldID]int16
brs []*BinlogReader
rrs []array.RecordReader
}
var _ RecordReader = (*CompositeBinlogRecordReader)(nil)
func (crr *CompositeBinlogRecordReader) Next() (Record, error) {
recs := make([]arrow.Array, len(crr.fields))
nonExistingFields := make([]*schemapb.FieldSchema, 0)
nRows := 0
for _, f := range crr.fields {
idx := crr.index[f.FieldID]
if crr.rrs[idx] != nil {
if ok := crr.rrs[idx].Next(); !ok {
return nil, io.EOF
}
r := crr.rrs[idx].Record()
recs[idx] = r.Column(0)
if nRows == 0 {
nRows = int(r.NumRows())
}
if nRows != int(r.NumRows()) {
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("number of rows mismatch for field %d", f.FieldID))
}
} else {
nonExistingFields = append(nonExistingFields, f)
}
}
for _, f := range nonExistingFields {
// If the field is not in the current batch, fill with null array
arr, err := GenerateEmptyArrayFromSchema(f, nRows)
if err != nil {
return nil, err
}
recs[crr.index[f.FieldID]] = arr
}
return &compositeRecord{
index: crr.index,
recs: recs,
}, nil
}
func (crr *CompositeBinlogRecordReader) Close() error {
if crr.brs != nil {
for _, er := range crr.brs {
if er != nil {
er.Close()
}
}
}
if crr.rrs != nil {
for _, rr := range crr.rrs {
if rr != nil {
rr.Release()
}
}
}
return nil
}