milvus/internal/datacoord/compaction_task_mix.go
congqixia f94b04e642
feat: [2.6] integrate Loon FFI for manifest-based segment loading and index building (#46076)
Cherry-pick from master
pr: #45061 #45488 #45803 #46017 #44991 #45132 #45723 #45726 #45798
#45897 #45918 #44998

This feature integrates the Storage V2 (Loon) FFI interface as a unified
storage layer for segment loading and index building in Milvus. It
enables
manifest-based data access, replacing the traditional binlog-based
approach
with a more efficient columnar storage format.

Key changes:

### Segment Self-Managed Loading Architecture
- Move segment loading orchestration from Go layer to C++ segcore
- Add NewSegmentWithLoadInfo() API for passing load info during segment
creation
- Implement SetLoadInfo() and Load() methods in SegmentInterface
- Support parallel loading of indexed and non-indexed fields
- Enable both sealed and growing segments to self-manage loading

### Storage V2 FFI Integration
- Integrate milvus-storage library's FFI interface for packed columnar
data
- Add manifest path support throughout the data path (SegmentInfo,
LoadInfo)
- Implement ManifestReader for generating manifests from binlogs
- Support zero-copy data exchange using Arrow C Data Interface
- Add ToCStorageConfig() for Go-to-C storage config conversion

### Manifest-Based Index Building
- Extend FileManagerContext to carry loon_ffi_properties
- Implement GetFieldDatasFromManifest() using Arrow C Stream interface
- Support manifest-based reading in DiskFileManagerImpl and
MemFileManagerImpl
- Add fallback to traditional segment insert files when manifest
unavailable

### Compaction Pipeline Updates
- Include manifest path in all compaction task builders (clustering, L0,
mix)
- Update BulkPackWriterV2 to return manifest path
- Propagate manifest metadata through compaction pipeline

### Configuration & Protocol
- Add common.storageV2.useLoonFFI config option (default: false)
- Add manifest_path field to SegmentLoadInfo and related proto messages
- Add manifest field to compaction segment messages

### Bug Fixes
- Fix mmap settings not applied during segment load (key typo fix)
- Populate index info after segment loading to prevent redundant load
tasks
- Fix memory corruption by removing premature transaction handle
destruction

Related issues: #44956, #45060, #39173

## Individual Cherry-Picked Commits

1. **e1c923b5cc** - fix: apply mmap settings correctly during segment
load (#46017)
2. **63b912370b** - enhance: use milvus-storage internal C++ Reader API
for Loon FFI (#45897)
3. **bfc192faa5** - enhance: Resolve issues integrating loon FFI
(#45918)
4. **fb18564631** - enhance: support manifest-based index building with
Loon FFI reader (#45726)
5. **b9ec2392b9** - enhance: integrate StorageV2 FFI interface for
manifest-based segment loading (#45798)
6. **66db3c32e6** - enhance: integrate Storage V2 FFI interface for
unified storage access (#45723)
7. **ae789273ac** - fix: populate index info after segment loading to
prevent redundant load tasks (#45803)
8. **49688b0be2** - enhance: Move segment loading logic from Go layer to
segcore for self-managed loading (#45488)
9. **5b2df88bac** - enhance: [StorageV2] Integrate FFI interface for
packed reader (#45132)
10. **91ff5706ac** - enhance: [StorageV2] add manifest path support for
FFI integration (#44991)
11. **2192bb4a85** - enhance: add NewSegmentWithLoadInfo API to support
segment self-managed loading (#45061)
12. **4296b01da0** - enhance: update delta log serialization APIs to
integrate storage V2 (#44998)

## Technical Details

### Architecture Changes
- **Before**: Go layer orchestrated segment loading, making multiple CGO
calls
- **After**: Segments autonomously manage loading in C++ layer with
single entry point

### Storage Access Pattern
- **Before**: Read individual binlog files through Go storage layer
- **After**: Read manifest file that references packed columnar data via
FFI

### Benefits
- Reduced cross-language call overhead
- Better resource management at C++ level
- Improved I/O performance through batched streaming reads
- Cleaner separation of concerns between Go and C++ layers
- Foundation for proactive schema evolution handling

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Ted Xu <ted.xu@zilliz.com>
2025-12-04 17:09:12 +08:00

429 lines
16 KiB
Go

package datacoord
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
var _ CompactionTask = (*mixCompactionTask)(nil)
type mixCompactionTask struct {
taskProto atomic.Value // *datapb.CompactionTask
allocator allocator.Allocator
meta CompactionMeta
ievm IndexEngineVersionManager
times *taskcommon.Times
slotUsage atomic.Int64
}
func (t *mixCompactionTask) GetTaskID() int64 {
return t.GetTaskProto().GetPlanID()
}
func (t *mixCompactionTask) GetTaskType() taskcommon.Type {
return taskcommon.Compaction
}
func (t *mixCompactionTask) GetTaskState() taskcommon.State {
return taskcommon.FromCompactionState(t.GetTaskProto().GetState())
}
func (t *mixCompactionTask) GetTaskSlot() int64 {
slotUsage := t.slotUsage.Load()
if slotUsage == 0 {
slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction {
segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0])
if segment != nil {
slotUsage = calculateStatsTaskSlot(segment.getSegmentSize())
}
}
t.slotUsage.Store(slotUsage)
}
return slotUsage
}
func (t *mixCompactionTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) {
t.times.SetTaskTime(timeType, time)
}
func (t *mixCompactionTask) GetTaskTime(timeType taskcommon.TimeType) time.Time {
return timeType.GetTaskTime(t.times)
}
func (t *mixCompactionTask) GetTaskVersion() int64 {
return int64(t.GetTaskProto().GetRetryTimes())
}
func (t *mixCompactionTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()),
zap.Int64("nodeID", nodeID))
plan, err := t.BuildCompactionRequest()
if err != nil {
log.Warn("mixCompactionTask failed to build compaction request", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
return
}
err = cluster.CreateCompaction(nodeID, plan)
if err != nil {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here.
originNodeID := t.GetTaskProto().GetNodeID()
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("nodeID", originNodeID),
zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
return
}
log.Info("mixCompactionTask notify compaction tasks to DataNode")
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing), setNodeID(nodeID))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
}
func (t *mixCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
result, err := cluster.QueryCompaction(t.GetTaskProto().GetNodeID(), &datapb.CompactionStateRequest{
PlanID: t.GetTaskProto().GetPlanID(),
})
if err != nil || result == nil {
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
return
}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
}
return
}
err = t.meta.ValidateSegmentStateBeforeCompleteCompactionMutation(t.GetTaskProto())
if err != nil {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return
}
if err := t.saveSegmentMeta(result); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
}
}
return
}
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
t.processMetaSaved()
case datapb.CompactionTaskState_pipelining, datapb.CompactionTaskState_executing:
return
case datapb.CompactionTaskState_timeout:
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err != nil {
log.Warn("update clustering compaction task meta failed", zap.Error(err))
return
}
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
default:
log.Error("not support compaction task state", zap.String("state", result.GetState().String()))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("update clustering compaction task meta failed", zap.Error(err))
return
}
}
}
func (t *mixCompactionTask) DropTaskOnWorker(cluster session.Cluster) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := cluster.DropCompaction(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID()); err != nil {
log.Warn("mixCompactionTask processCompleted unable to drop compaction plan")
}
}
func (t *mixCompactionTask) GetTaskProto() *datapb.CompactionTask {
task := t.taskProto.Load()
if task == nil {
return nil
}
return task.(*datapb.CompactionTask)
}
func newMixCompactionTask(t *datapb.CompactionTask,
allocator allocator.Allocator,
meta CompactionMeta,
ievm IndexEngineVersionManager,
) *mixCompactionTask {
task := &mixCompactionTask{
allocator: allocator,
meta: meta,
ievm: ievm,
times: taskcommon.NewTimes(),
}
task.taskProto.Store(t)
return task
}
func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
return t.processCompleted()
}
func (t *mixCompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(context.TODO(), task)
}
func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *mixCompactionTask) saveSegmentMeta(result *datapb.CompactionPlanResult) error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
// Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(context.TODO(), t.taskProto.Load().(*datapb.CompactionTask), result)
if err != nil {
return err
}
// Apply metrics after successful meta update.
newSegmentIDs := lo.Map(newSegments, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
metricMutation.commit()
for _, newSegID := range newSegmentIDs {
select {
case getBuildIndexChSingleton() <- newSegID:
default:
}
}
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(newSegmentIDs))
if err != nil {
log.Warn("mixCompaction failed to setState meta saved", zap.Error(err))
return err
}
log.Info("mixCompactionTask success to save segment meta")
return nil
}
// Note: return True means exit this state machine.
// ONLY return True for Completed, Failed or Timeout
func (t *mixCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID",
t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
processResult := false
switch t.GetTaskProto().GetState() {
case datapb.CompactionTaskState_meta_saved:
processResult = t.processMetaSaved()
case datapb.CompactionTaskState_completed:
processResult = t.processCompleted()
case datapb.CompactionTaskState_failed:
processResult = t.processFailed()
case datapb.CompactionTaskState_timeout:
processResult = true
}
currentState := t.GetTaskProto().GetState().String()
if currentState != lastState {
log.Info("mix compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState))
}
return processResult
}
func (t *mixCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.taskProto.Load().(*datapb.CompactionTask).PartitionID, t.GetTaskProto().GetChannel())
}
func (t *mixCompactionTask) NeedReAssignNodeID() bool {
return t.GetTaskProto().GetState() == datapb.CompactionTaskState_pipelining && (t.GetTaskProto().GetNodeID() == 0 || t.GetTaskProto().GetNodeID() == NullNodeID)
}
func (t *mixCompactionTask) processCompleted() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("PlanID", t.GetTaskProto().GetPlanID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
t.resetSegmentCompacting()
log.Info("mixCompactionTask processCompleted done")
return true
}
func (t *mixCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(context.TODO(), t.taskProto.Load().(*datapb.CompactionTask).GetInputSegments(), false)
}
func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask)
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}
func (t *mixCompactionTask) processFailed() bool {
return true
}
func (t *mixCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *mixCompactionTask) doClean() error {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("mixCompactionTask clean done")
return nil
}
func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
return err
}
t.SetTask(task)
return nil
}
func (t *mixCompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}
func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
compactionParams, err := compaction.GenerateJSONParams()
if err != nil {
return nil, err
}
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
Type: taskProto.GetType(),
Channel: taskProto.GetChannel(),
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: taskProto.GetMaxSize(),
JsonParams: compactionParams,
CurrentScalarIndexVersion: t.ievm.GetCurrentScalarIndexEngineVersion(),
}
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
segments := make([]*SegmentInfo, 0, len(taskProto.GetInputSegments()))
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(context.TODO(), segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
plan.SegmentBinlogs = append(plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
CollectionID: segInfo.GetCollectionID(),
PartitionID: segInfo.GetPartitionID(),
Level: segInfo.GetLevel(),
InsertChannel: segInfo.GetInsertChannel(),
FieldBinlogs: segInfo.GetBinlogs(),
Field2StatslogPaths: segInfo.GetStatslogs(),
Deltalogs: segInfo.GetDeltalogs(),
IsSorted: segInfo.GetIsSorted(),
StorageVersion: segInfo.GetStorageVersion(),
Manifest: segInfo.GetManifestPath(),
})
segIDMap[segID] = segInfo.GetDeltalogs()
segments = append(segments, segInfo)
}
logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments)
if err != nil {
return nil, err
}
plan.PreAllocatedLogIDs = logIDRange
// BeginLogID is deprecated, but still assign it for compatibility.
plan.BeginLogID = logIDRange.Begin
WrapPluginContext(taskProto.GetCollectionID(), taskProto.GetSchema().GetProperties(), plan)
log.Info("Compaction handler refreshed mix compaction plan", zap.Int64("maxSize", plan.GetMaxSize()),
zap.Any("PreAllocatedLogIDs", logIDRange), zap.Any("segID2DeltaLogs", segIDMap))
return plan, nil
}
func (t *mixCompactionTask) GetSlotUsage() int64 {
return t.GetTaskSlot()
}