milvus/internal/storage/record_reader.go
congqixia f94b04e642
feat: [2.6] integrate Loon FFI for manifest-based segment loading and index building (#46076)
Cherry-pick from master
pr: #45061 #45488 #45803 #46017 #44991 #45132 #45723 #45726 #45798
#45897 #45918 #44998

This feature integrates the Storage V2 (Loon) FFI interface as a unified
storage layer for segment loading and index building in Milvus. It
enables
manifest-based data access, replacing the traditional binlog-based
approach
with a more efficient columnar storage format.

Key changes:

### Segment Self-Managed Loading Architecture
- Move segment loading orchestration from Go layer to C++ segcore
- Add NewSegmentWithLoadInfo() API for passing load info during segment
creation
- Implement SetLoadInfo() and Load() methods in SegmentInterface
- Support parallel loading of indexed and non-indexed fields
- Enable both sealed and growing segments to self-manage loading

### Storage V2 FFI Integration
- Integrate milvus-storage library's FFI interface for packed columnar
data
- Add manifest path support throughout the data path (SegmentInfo,
LoadInfo)
- Implement ManifestReader for generating manifests from binlogs
- Support zero-copy data exchange using Arrow C Data Interface
- Add ToCStorageConfig() for Go-to-C storage config conversion

### Manifest-Based Index Building
- Extend FileManagerContext to carry loon_ffi_properties
- Implement GetFieldDatasFromManifest() using Arrow C Stream interface
- Support manifest-based reading in DiskFileManagerImpl and
MemFileManagerImpl
- Add fallback to traditional segment insert files when manifest
unavailable

### Compaction Pipeline Updates
- Include manifest path in all compaction task builders (clustering, L0,
mix)
- Update BulkPackWriterV2 to return manifest path
- Propagate manifest metadata through compaction pipeline

### Configuration & Protocol
- Add common.storageV2.useLoonFFI config option (default: false)
- Add manifest_path field to SegmentLoadInfo and related proto messages
- Add manifest field to compaction segment messages

### Bug Fixes
- Fix mmap settings not applied during segment load (key typo fix)
- Populate index info after segment loading to prevent redundant load
tasks
- Fix memory corruption by removing premature transaction handle
destruction

Related issues: #44956, #45060, #39173

## Individual Cherry-Picked Commits

1. **e1c923b5cc** - fix: apply mmap settings correctly during segment
load (#46017)
2. **63b912370b** - enhance: use milvus-storage internal C++ Reader API
for Loon FFI (#45897)
3. **bfc192faa5** - enhance: Resolve issues integrating loon FFI
(#45918)
4. **fb18564631** - enhance: support manifest-based index building with
Loon FFI reader (#45726)
5. **b9ec2392b9** - enhance: integrate StorageV2 FFI interface for
manifest-based segment loading (#45798)
6. **66db3c32e6** - enhance: integrate Storage V2 FFI interface for
unified storage access (#45723)
7. **ae789273ac** - fix: populate index info after segment loading to
prevent redundant load tasks (#45803)
8. **49688b0be2** - enhance: Move segment loading logic from Go layer to
segcore for self-managed loading (#45488)
9. **5b2df88bac** - enhance: [StorageV2] Integrate FFI interface for
packed reader (#45132)
10. **91ff5706ac** - enhance: [StorageV2] add manifest path support for
FFI integration (#44991)
11. **2192bb4a85** - enhance: add NewSegmentWithLoadInfo API to support
segment self-managed loading (#45061)
12. **4296b01da0** - enhance: update delta log serialization APIs to
integrate storage V2 (#44998)

## Technical Details

### Architecture Changes
- **Before**: Go layer orchestrated segment loading, making multiple CGO
calls
- **After**: Segments autonomously manage loading in C++ layer with
single entry point

### Storage Access Pattern
- **Before**: Read individual binlog files through Go storage layer
- **After**: Read manifest file that references packed columnar data via
FFI

### Benefits
- Reduced cross-language call overhead
- Better resource management at C++ level
- Improved I/O performance through batched streaming reads
- Cleaner separation of concerns between Go and C++ layers
- Foundation for proactive schema evolution handling

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Ted Xu <ted.xu@zilliz.com>
2025-12-04 17:09:12 +08:00

337 lines
8.5 KiB
Go

package storage
import (
"fmt"
"io"
"strconv"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type RecordReader interface {
Next() (Record, error)
Close() error
}
type packedRecordReader struct {
reader *packed.PackedReader
field2Col map[FieldID]int
}
var _ RecordReader = (*packedRecordReader)(nil)
func (pr *packedRecordReader) Next() (Record, error) {
rec, err := pr.reader.ReadNext()
if err != nil {
return nil, err
}
return NewSimpleArrowRecord(rec, pr.field2Col), nil
}
func (pr *packedRecordReader) Close() error {
if pr.reader != nil {
return pr.reader.Close()
}
return nil
}
func newPackedRecordReader(
paths []string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*packedRecordReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, true)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
for i, field := range allFields {
field2Col[field.FieldID] = i
}
reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize, storageConfig, storagePluginContext)
if err != nil {
return nil, err
}
return &packedRecordReader{
reader: reader,
field2Col: field2Col,
}, nil
}
func NewRecordReaderFromManifest(manifest string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (RecordReader, error) {
return NewManifestReader(manifest, schema, bufferSize, storageConfig, storagePluginContext)
}
var _ RecordReader = (*IterativeRecordReader)(nil)
type IterativeRecordReader struct {
cur RecordReader
iterate func() (RecordReader, error)
}
// Close implements RecordReader.
func (ir *IterativeRecordReader) Close() error {
if ir.cur != nil {
return ir.cur.Close()
}
return nil
}
func (ir *IterativeRecordReader) Next() (Record, error) {
if ir.cur == nil {
r, err := ir.iterate()
if err != nil {
return nil, err
}
ir.cur = r
}
rec, err := ir.cur.Next()
if err == io.EOF {
closeErr := ir.cur.Close()
if closeErr != nil {
return nil, closeErr
}
ir.cur, err = ir.iterate()
if err != nil {
return nil, err
}
rec, err = ir.cur.Next()
}
return rec, err
}
func newIterativePackedRecordReader(
paths [][]string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) *IterativeRecordReader {
chunk := 0
return &IterativeRecordReader{
iterate: func() (RecordReader, error) {
if chunk >= len(paths) {
return nil, io.EOF
}
currentPaths := paths[chunk]
chunk++
return newPackedRecordReader(currentPaths, schema, bufferSize, storageConfig, storagePluginContext)
},
}
}
type ManifestReader struct {
fieldBinlogs []*datapb.FieldBinlog
manifest string
reader *packed.FFIPackedReader
bufferSize int64
arrowSchema *arrow.Schema
schema *schemapb.CollectionSchema
schemaHelper *typeutil.SchemaHelper
field2Col map[FieldID]int
storageConfig *indexpb.StorageConfig
storagePluginContext *indexcgopb.StoragePluginContext
neededColumns []string
}
// NewManifestReaderFromBinlogs creates a ManifestReader from binlogs
func NewManifestReaderFromBinlogs(fieldBinlogs []*datapb.FieldBinlog,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*ManifestReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, false)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return nil, err
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
neededColumns := make([]string, 0, len(allFields))
for i, field := range allFields {
field2Col[field.FieldID] = i
neededColumns = append(neededColumns, field.Name)
}
prr := &ManifestReader{
fieldBinlogs: fieldBinlogs,
bufferSize: bufferSize,
arrowSchema: arrowSchema,
schema: schema,
schemaHelper: schemaHelper,
field2Col: field2Col,
storageConfig: storageConfig,
storagePluginContext: storagePluginContext,
neededColumns: neededColumns,
}
err = prr.init()
if err != nil {
return nil, err
}
return prr, nil
}
func NewManifestReader(manifest string,
schema *schemapb.CollectionSchema,
bufferSize int64,
storageConfig *indexpb.StorageConfig,
storagePluginContext *indexcgopb.StoragePluginContext,
) (*ManifestReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema, true)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return nil, err
}
field2Col := make(map[FieldID]int)
allFields := typeutil.GetAllFieldSchemas(schema)
neededColumns := make([]string, 0, len(allFields))
for i, field := range allFields {
field2Col[field.FieldID] = i
// Use field id here
neededColumns = append(neededColumns, strconv.FormatInt(field.FieldID, 10))
}
prr := &ManifestReader{
manifest: manifest,
bufferSize: bufferSize,
arrowSchema: arrowSchema,
schema: schema,
schemaHelper: schemaHelper,
field2Col: field2Col,
storageConfig: storageConfig,
storagePluginContext: storagePluginContext,
neededColumns: neededColumns,
}
err = prr.init()
if err != nil {
return nil, err
}
return prr, nil
}
func (mr *ManifestReader) init() error {
// TODO add needed column option
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
if err != nil {
return err
}
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
if err != nil {
return err
}
mr.reader = reader
return nil
}
func (mr ManifestReader) Next() (Record, error) {
rec, err := mr.reader.ReadNext()
if err != nil {
return nil, err
}
return NewSimpleArrowRecord(rec, mr.field2Col), nil
}
func (mr ManifestReader) Close() error {
if mr.reader != nil {
return mr.reader.Close()
}
return nil
}
// ChunkedBlobsReader returns a chunk composed of blobs, or io.EOF if no more data
type ChunkedBlobsReader func() ([]*Blob, error)
type CompositeBinlogRecordReader struct {
fields map[FieldID]*schemapb.FieldSchema
index map[FieldID]int16
brs []*BinlogReader
rrs []array.RecordReader
}
var _ RecordReader = (*CompositeBinlogRecordReader)(nil)
func (crr *CompositeBinlogRecordReader) Next() (Record, error) {
recs := make([]arrow.Array, len(crr.fields))
nonExistingFields := make([]*schemapb.FieldSchema, 0)
nRows := 0
for _, f := range crr.fields {
idx := crr.index[f.FieldID]
if crr.rrs[idx] != nil {
if ok := crr.rrs[idx].Next(); !ok {
return nil, io.EOF
}
r := crr.rrs[idx].Record()
recs[idx] = r.Column(0)
if nRows == 0 {
nRows = int(r.NumRows())
}
if nRows != int(r.NumRows()) {
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("number of rows mismatch for field %d", f.FieldID))
}
} else {
nonExistingFields = append(nonExistingFields, f)
}
}
for _, f := range nonExistingFields {
// If the field is not in the current batch, fill with null array
arr, err := GenerateEmptyArrayFromSchema(f, nRows)
if err != nil {
return nil, err
}
recs[crr.index[f.FieldID]] = arr
}
return &compositeRecord{
index: crr.index,
recs: recs,
}, nil
}
func (crr *CompositeBinlogRecordReader) Close() error {
if crr.brs != nil {
for _, er := range crr.brs {
if er != nil {
er.Close()
}
}
}
if crr.rrs != nil {
for _, rr := range crr.rrs {
if rr != nil {
rr.Release()
}
}
}
return nil
}