milvus/internal/datanode/compactor/segment_writer.go
congqixia c01fd94a6a
enhance: integrate Storage V2 FFI interface for unified storage access (#45723)
Related #44956
This commit integrates the Storage V2 FFI (Foreign Function Interface)
interface throughout the Milvus codebase, enabling unified storage
access through the Loon FFI layer. This is a significant step towards
standardizing storage operations across different storage versions.

1. Configuration Support
- **configs/milvus.yaml**: Added `useLoonFFI` configuration flag under
`common.storage.file.splitByAvgSize` section
- Allows runtime toggle between traditional binlog readers and new
FFI-based manifest readers
  - Default: `false` (maintains backward compatibility)

2. Core FFI Infrastructure

Enhanced Utilities (internal/core/src/storage/loon_ffi/util.cpp/h)
- **ToCStorageConfig()**: Converts Go's `StorageConfig` to C's
`CStorageConfig` struct for FFI calls
- **GetManifest()**: Parses manifest JSON and retrieves latest column
groups using FFI
  - Accepts manifest path with `base_path` and `ver` fields
  - Calls `get_latest_column_groups()` FFI function
  - Returns column group information as string
  - Comprehensive error handling for JSON parsing and FFI errors

3. Dependency Updates
- **internal/core/thirdparty/milvus-storage/CMakeLists.txt**:
  - Updated milvus-storage version from `0883026` to `302143c`
  - Ensures compatibility with latest FFI interfaces

4. Data Coordinator Changes

All compaction task builders now include manifest path in segment
binlogs:

- **compaction_task_clustering.go**: Added `Manifest:
segInfo.GetManifestPath()` to segment binlogs
- **compaction_task_l0.go**: Added manifest path to both L0 segment
selection and compaction plan building
- **compaction_task_mix.go**: Added manifest path to mixed compaction
segment binlogs
- **meta.go**: Updated metadata completion logic:
- `completeClusterCompactionMutation()`: Set `ManifestPath` in new
segment info
- `completeMixCompactionMutation()`: Preserve manifest path in compacted
segments
- `completeSortCompactionMutation()`: Include manifest path in sorted
segments

5. Data Node Compactor Enhancements

All compactors updated to support dual-mode reading (binlog vs
manifest):

6. Flush & Sync Manager Updates

Pack Writer V2 (pack_writer_v2.go)
- **BulkPackWriterV2.Write()**: Extended return signature to include
`manifest string`
- Implementation:
  - Generate manifest path: `path.Join(pack.segmentID, "manifest.json")`
  - Write packed data using FFI-based writer
  - Return manifest path along with binlogs, deltas, and stats

Task Handling (task.go)
- Updated all sync task result handling to accommodate new manifest
return value
- Ensured backward compatibility for callers not using manifest

7. Go Storage Layer Integration

New Interfaces and Implementations
- **record_reader.go**: Interface for unified record reading across
storage versions
- **record_writer.go**: Interface for unified record writing across
storage versions
- **binlog_record_writer.go**: Concrete implementation for traditional
binlog-based writing

Enhanced Schema Support (schema.go, schema_test.go)
- Schema conversion utilities to support FFI-based storage operations
- Ensures proper Arrow schema mapping for V2 storage

Serialization Updates
- **serde.go, serde_events.go, serde_events_v2.go**: Updated to work
with new reader/writer interfaces
- Test files updated to validate dual-mode serialization

8. Storage V2 Packed Format

FFI Common (storagev2/packed/ffi_common.go)
- Common FFI utilities and type conversions for packed storage format

Packed Writer FFI (storagev2/packed/packed_writer_ffi.go)
- FFI-based implementation of packed writer
- Integrates with Loon storage layer for efficient columnar writes

Packed Reader FFI (storagev2/packed/packed_reader_ffi.go)
- Already existed, now complemented by writer implementation

9. Protocol Buffer Updates

data_coord.proto & datapb/data_coord.pb.go
- Added `manifest` field to compaction segment messages
- Enables passing manifest metadata through compaction pipeline

worker.proto & workerpb/worker.pb.go
- Added compaction parameter for `useLoonFFI` flag
- Allows workers to receive FFI configuration from coordinator

10. Parameter Configuration

component_param.go
- Added `UseLoonFFI` parameter to compaction configuration
- Reads from `common.storage.file.useLoonFFI` config path
- Default: `false` for safe rollout

11. Test Updates
- **clustering_compactor_storage_v2_test.go**: Updated signatures to
handle manifest return value
- **mix_compactor_storage_v2_test.go**: Updated test helpers for
manifest support
- **namespace_compactor_test.go**: Adjusted writer calls to expect
manifest
- **pack_writer_v2_test.go**: Validated manifest generation in pack
writing

This integration follows a **dual-mode approach**:
1. **Legacy Path**: Traditional binlog-based reading/writing (when
`useLoonFFI=false` or no manifest)
2. **FFI Path**: Manifest-based reading/writing through Loon FFI (when
`useLoonFFI=true` and manifest exists)

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-11-24 19:57:07 +08:00

568 lines
15 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"context"
"fmt"
"math"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// Not concurrent safe.
type MultiSegmentWriter struct {
ctx context.Context
binlogIO io.BinlogIO
allocator *compactionAlloactor
writer *storage.BinlogValueWriter
currentSegmentID typeutil.UniqueID
maxRows int64
segmentSize int64
// segmentSize in Bytes
// segmentSize might be changed dynamicly. To make sure a compaction plan is static,
// The target segmentSize is defined when creating the compaction plan.
schema *schemapb.CollectionSchema
partitionID int64
collectionID int64
channel string
batchSize int
binLogMaxSize uint64
res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
storageVersion int64
params compaction.Params
rwOption []storage.RwOption
}
type compactionAlloactor struct {
segmentAlloc allocator.Interface
logIDAlloc allocator.Interface
}
func NewCompactionAllocator(segmentAlloc, logIDAlloc allocator.Interface) *compactionAlloactor {
return &compactionAlloactor{
segmentAlloc: segmentAlloc,
logIDAlloc: logIDAlloc,
}
}
func (alloc *compactionAlloactor) allocSegmentID() (typeutil.UniqueID, error) {
return alloc.segmentAlloc.AllocOne()
}
func NewMultiSegmentWriter(ctx context.Context, binlogIO io.BinlogIO, allocator *compactionAlloactor, segmentSize int64,
schema *schemapb.CollectionSchema, params compaction.Params,
maxRows int64, partitionID, collectionID int64, channel string, batchSize int, rwOption ...storage.RwOption,
) (*MultiSegmentWriter, error) {
rwOpts := rwOption
if len(rwOption) == 0 {
rwOpts = make([]storage.RwOption, 0)
}
return &MultiSegmentWriter{
ctx: ctx,
binlogIO: binlogIO,
allocator: allocator,
maxRows: maxRows, // For bloomfilter only
segmentSize: segmentSize,
schema: schema,
partitionID: partitionID,
collectionID: collectionID,
channel: channel,
batchSize: batchSize,
binLogMaxSize: params.BinLogMaxSize,
res: make([]*datapb.CompactionSegment, 0),
storageVersion: params.StorageVersion,
params: params,
rwOption: rwOpts,
}, nil
}
func (w *MultiSegmentWriter) closeWriter() error {
if w.writer != nil {
if err := w.writer.Close(); err != nil {
return err
}
fieldBinlogs, statsLog, bm25Logs, manifest := w.writer.GetLogs()
result := &datapb.CompactionSegment{
SegmentID: w.currentSegmentID,
InsertLogs: storage.SortFieldBinlogs(fieldBinlogs),
Field2StatslogPaths: []*datapb.FieldBinlog{statsLog},
NumOfRows: w.writer.GetRowNum(),
Channel: w.channel,
Bm25Logs: lo.Values(bm25Logs),
StorageVersion: w.storageVersion,
Manifest: manifest,
}
w.res = append(w.res, result)
log.Info("created new segment",
zap.Int64("segmentID", w.currentSegmentID),
zap.String("channel", w.channel),
zap.Int64("totalRows", w.writer.GetRowNum()),
zap.Uint64("totalSize", w.writer.GetWrittenUncompressed()),
zap.Int64("expected segment size", w.segmentSize),
zap.Int64("storageVersion", w.storageVersion))
}
return nil
}
func (w *MultiSegmentWriter) rotateWriter() error {
if err := w.closeWriter(); err != nil {
return err
}
newSegmentID, err := w.allocator.allocSegmentID()
if err != nil {
return err
}
w.currentSegmentID = newSegmentID
chunkSize := w.binLogMaxSize
w.rwOption = append(w.rwOption,
storage.WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return w.binlogIO.Upload(ctx, kvs)
}),
storage.WithVersion(w.storageVersion),
)
rw, err := storage.NewBinlogRecordWriter(w.ctx, w.collectionID, w.partitionID, newSegmentID,
w.schema, w.allocator.logIDAlloc, chunkSize, w.maxRows, w.rwOption...,
)
if err != nil {
return err
}
w.writer = storage.NewBinlogValueWriter(rw, w.batchSize)
return nil
}
func (w *MultiSegmentWriter) GetWrittenUncompressed() uint64 {
if w.writer == nil {
return 0
}
return w.writer.GetWrittenUncompressed()
}
func (w *MultiSegmentWriter) GetBufferUncompressed() uint64 {
if w.writer == nil {
return 0
}
return w.writer.GetBufferUncompressed()
}
func (w *MultiSegmentWriter) GetCompactionSegments() []*datapb.CompactionSegment {
return w.res
}
func (w *MultiSegmentWriter) Write(r storage.Record) error {
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if err := w.rotateWriter(); err != nil {
return err
}
}
return w.writer.Write(r)
}
func (w *MultiSegmentWriter) WriteValue(v *storage.Value) error {
if w.writer == nil || w.writer.GetWrittenUncompressed() >= uint64(w.segmentSize) {
if err := w.rotateWriter(); err != nil {
return err
}
}
return w.writer.WriteValue(v)
}
// Flush calls storage.SerializeWriter.Flush(), it is used for serialize the value buffer to record and write to binlog.
// Note: the record is not written to binlog immediately, it will be written when the buffer is full or the writer is closed.
// Call this function before record iteration to avoid the underlying record be released.
func (w *MultiSegmentWriter) Flush() error {
if w.writer == nil {
return nil
}
return w.writer.Flush()
}
func (w *MultiSegmentWriter) FlushChunk() error {
if w.writer == nil {
return nil
}
if err := w.writer.Flush(); err != nil {
return err
}
return w.writer.FlushChunk()
}
func (w *MultiSegmentWriter) Close() error {
if w.writer != nil {
return w.closeWriter()
}
return nil
}
func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
tsFrom: math.MaxUint64,
tsTo: 0,
}
}
type SegmentDeltaWriter struct {
deleteData *storage.DeleteData
segmentID int64
partitionID int64
collectionID int64
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
}
func (w *SegmentDeltaWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentDeltaWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentDeltaWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentDeltaWriter) GetRowNum() int64 {
return w.deleteData.RowCount
}
func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) {
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
}
func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) {
w.deleteData.Append(pk, ts)
w.updateRange(ts)
}
func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) {
w.deleteData.AppendBatch(pks, tss)
for _, ts := range tss {
w.updateRange(ts)
}
}
func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) {
blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData)
if err != nil {
return nil, nil, err
}
return blob, w.GetTimeRange(), nil
}
type SegmentWriter struct {
writer *storage.BinlogSerializeWriter
closers []func() (*storage.Blob, error)
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
pkstats *storage.PrimaryKeyStats
bm25Stats map[int64]*storage.BM25Stats
segmentID int64
partitionID int64
collectionID int64
sch *schemapb.CollectionSchema
rowCount *atomic.Int64
syncedSize *atomic.Int64
batchSize int
maxBinlogSize uint64
}
func (w *SegmentWriter) GetRowNum() int64 {
return w.rowCount.Load()
}
func (w *SegmentWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentWriter) GetPkID() int64 {
return w.pkstats.FieldID
}
func (w *SegmentWriter) WrittenMemorySize() uint64 {
return w.writer.GetWrittenUncompressed()
}
func (w *SegmentWriter) WriteRecord(r storage.Record) error {
tsArray := r.Column(common.TimeStampField).(*array.Int64)
rows := r.Len()
for i := 0; i < rows; i++ {
ts := typeutil.Timestamp(tsArray.Value(i))
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
switch schemapb.DataType(w.pkstats.PkType) {
case schemapb.DataType_Int64:
pkArray := r.Column(w.GetPkID()).(*array.Int64)
pk := &storage.Int64PrimaryKey{
Value: pkArray.Value(i),
}
w.pkstats.Update(pk)
case schemapb.DataType_VarChar:
pkArray := r.Column(w.GetPkID()).(*array.String)
pk := &storage.VarCharPrimaryKey{
Value: pkArray.Value(i),
}
w.pkstats.Update(pk)
default:
panic("invalid data type")
}
for fieldID, stats := range w.bm25Stats {
field, ok := r.Column(fieldID).(*array.Binary)
if !ok {
return errors.New("bm25 field value not found")
}
stats.AppendBytes(field.Value(i))
}
w.rowCount.Inc()
}
return w.writer.Write(r)
}
func (w *SegmentWriter) Write(v *storage.Value) error {
ts := typeutil.Timestamp(v.Timestamp)
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
w.pkstats.Update(v.PK)
for fieldID, stats := range w.bm25Stats {
data, ok := v.Value.(map[storage.FieldID]interface{})[fieldID]
if !ok {
return errors.New("bm25 field value not found")
}
bytes, ok := data.([]byte)
if !ok {
return errors.New("bm25 field value not sparse bytes")
}
stats.AppendBytes(bytes)
}
w.rowCount.Inc()
return w.writer.WriteValue(v)
}
func (w *SegmentWriter) Finish() (*storage.Blob, error) {
w.writer.Flush()
codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: w.collectionID, Schema: w.sch})
return codec.SerializePkStats(w.pkstats, w.GetRowNum())
}
func (w *SegmentWriter) GetBm25Stats() map[int64]*storage.BM25Stats {
return w.bm25Stats
}
func (w *SegmentWriter) GetBm25StatsBlob() (map[int64]*storage.Blob, error) {
result := make(map[int64]*storage.Blob)
for fieldID, stats := range w.bm25Stats {
bytes, err := stats.Serialize()
if err != nil {
return nil, err
}
result[fieldID] = &storage.Blob{
Key: fmt.Sprintf("%d", fieldID),
Value: bytes,
RowNum: stats.NumRow(),
MemorySize: int64(len(bytes)),
}
}
return result, nil
}
func (w *SegmentWriter) IsFull() bool {
return w.writer.GetWrittenUncompressed() > w.maxBinlogSize
}
func (w *SegmentWriter) FlushAndIsFull() bool {
w.writer.Flush()
return w.writer.GetWrittenUncompressed() > w.maxBinlogSize
}
func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
return w.writer.GetWrittenUncompressed() > binLogMaxSize
}
func (w *SegmentWriter) IsEmpty() bool {
return w.writer.GetWrittenUncompressed() == 0
}
func (w *SegmentWriter) FlushAndIsEmpty() bool {
w.writer.Flush()
return w.writer.GetWrittenUncompressed() == 0
}
func (w *SegmentWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentWriter) SerializeYield() ([]*storage.Blob, *writebuffer.TimeRange, error) {
w.writer.Flush()
w.writer.Close()
fieldData := make([]*storage.Blob, len(w.closers))
for i, f := range w.closers {
blob, err := f()
if err != nil {
return nil, nil, err
}
fieldData[i] = blob
}
tr := w.GetTimeRange()
w.clear()
return fieldData, tr, nil
}
func (w *SegmentWriter) GetTotalSize() int64 {
return w.syncedSize.Load() + int64(w.writer.GetWrittenUncompressed())
}
func (w *SegmentWriter) clear() {
w.syncedSize.Add(int64(w.writer.GetWrittenUncompressed()))
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize)
w.writer = writer
w.closers = closers
w.tsFrom = math.MaxUint64
w.tsTo = 0
}
// deprecated: use NewMultiSegmentWriter instead
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize)
if err != nil {
return nil, err
}
pkField, err := typeutil.GetPrimaryFieldSchema(sch)
if err != nil {
log.Warn("failed to get pk field from schema")
return nil, err
}
stats, err := storage.NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxCount)
if err != nil {
return nil, err
}
segWriter := SegmentWriter{
writer: writer,
closers: closers,
tsFrom: math.MaxUint64,
tsTo: 0,
pkstats: stats,
bm25Stats: make(map[int64]*storage.BM25Stats),
sch: sch,
segmentID: segID,
partitionID: partID,
collectionID: collID,
rowCount: atomic.NewInt64(0),
syncedSize: atomic.NewInt64(0),
batchSize: batchSize,
maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}
for _, fieldID := range Bm25Fields {
segWriter.bm25Stats[fieldID] = storage.NewBM25Stats()
}
return &segWriter, nil
}
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int,
) (writer *storage.BinlogSerializeWriter, closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, batchSize)
return
}