enhance: support useLoonFFI flag in import workflow (#46363)

Related to #44956

This change propagates the useLoonFFI configuration through the import
pipeline to enable LOON FFI usage during data import operations.

Key changes:
- Add use_loon_ffi field to ImportRequest protobuf message
- Add manifest_path field to ImportSegmentInfo for tracking manifest
- Initialize manifest path when creating segments (both import and
growing)
- Pass useLoonFFI flag through NewSyncTask in import tasks
- Simplify pack_writer_v2 by removing GetManifestInfo method and relying
on pre-initialized manifest path from segment creation
- Update segment meta with manifest path after import completion

This allows the import workflow to use the LOON FFI based packed writer
when the common.useLoonFFI configuration is enabled.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-12-17 16:35:16 +08:00 committed by GitHub
parent 52026cf07e
commit 46c14781be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 929 additions and 899 deletions

View File

@ -214,8 +214,9 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
return
}
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
opManifest := UpdateManifest(info.GetSegmentID(), info.GetManifestPath())
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = t.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
err = t.meta.UpdateSegmentsInfo(context.TODO(), op1, opManifest, op2)
if err != nil {
updateErr := t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if updateErr != nil {

View File

@ -379,6 +379,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
StorageConfig: createStorageConfig(),
TaskSlot: task.GetTaskSlot(),
StorageVersion: storageVersion,
UseLoonFfi: Params.CommonCfg.UseLoonFFI.GetAsBool(),
}
WrapPluginContextWithImport(task.GetCollectionID(), job.GetSchema().GetProperties(), job.GetOptions(), req)
return req, nil

View File

@ -299,7 +299,7 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
}
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, data, nil,
bm25Stats, t.req.GetStorageVersion(), t.req.GetStorageConfig())
bm25Stats, t.req.GetStorageVersion(), t.req.GetUseLoonFfi(), t.req.GetStorageConfig())
if err != nil {
return nil, nil, err
}

View File

@ -253,7 +253,7 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
}
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, nil, data,
nil, t.req.GetStorageVersion(), t.req.GetStorageConfig())
nil, t.req.GetStorageVersion(), false, t.req.GetStorageConfig())
if err != nil {
return nil, nil, err
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand"
"path"
"strconv"
"time"
@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/function/embedding"
"github.com/milvus-io/milvus/internal/util/function/models"
@ -42,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -58,18 +61,30 @@ func NewSyncTask(ctx context.Context,
deleteData *storage.DeleteData,
bm25Stats map[int64]*storage.BM25Stats,
storageVersion int64,
useLoonFFI bool,
storageConfig *indexpb.StorageConfig,
) (syncmgr.Task, error) {
metaCache := metaCaches[vchannel]
if _, ok := metaCache.GetSegmentByID(segmentID); !ok {
metaCache.AddSegment(&datapb.SegmentInfo{
segment := &datapb.SegmentInfo{
ID: segmentID,
State: commonpb.SegmentState_Importing,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: vchannel,
StorageVersion: storageVersion,
}, func(info *datapb.SegmentInfo) pkoracle.PkStat {
}
// init first manifest path
if useLoonFFI {
k := metautil.JoinIDPath(collectionID, partitionID, segmentID)
basePath := path.Join(storageConfig.GetRootPath(), common.SegmentInsertLogPath, k)
if storageConfig.GetStorageType() != "local" {
basePath = path.Join(storageConfig.GetBucketName(), basePath)
}
// -1 for first write
segment.ManifestPath = packed.MarshalManifestPath(basePath, -1)
}
metaCache.AddSegment(segment, func(info *datapb.SegmentInfo) pkoracle.PkStat {
bfs := pkoracle.NewBloomFilterSet()
return bfs
}, metacache.NewBM25StatsFactory)
@ -123,6 +138,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache
Statslogs: lo.Values(statsBinlog),
Bm25Logs: lo.Values(bm25Log),
Deltalogs: deltaLogs,
ManifestPath: segment.ManifestPath(),
}, nil
}

View File

@ -20,7 +20,6 @@ import (
"context"
"encoding/base64"
"math"
"path"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
@ -183,27 +182,6 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
return logs, manifestPath, nil
}
func (bw *BulkPackWriterV2) GetManifestInfo(pack *SyncPack) (basePath string, version int64, err error) {
// empty info, shall be first write,
// initialize manifestPath with -1 version
if bw.manifestPath == "" {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID)
logicalPath := path.Join(bw.getRootPath(), common.SegmentInsertLogPath, k)
bucketName := bw.getBucketName()
// if storage config is not passed, use common config
storageType := paramtable.Get().CommonCfg.StorageType.GetValue()
if bw.storageConfig != nil {
storageType = bw.storageConfig.GetStorageType()
}
if storageType != "local" {
basePath = path.Join(bucketName, logicalPath)
}
return basePath, -1, nil
}
return packed.UnmarshalManfestPath(bw.manifestPath)
}
func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context,
pluginContextPtr *indexcgopb.StoragePluginContext,
pack *SyncPack,
@ -228,8 +206,8 @@ func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context,
}
var manifestPath string
if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() || bw.manifestPath != "" {
basePath, version, err := bw.GetManifestInfo(pack)
if bw.manifestPath != "" {
basePath, version, err := packed.UnmarshalManfestPath(bw.manifestPath)
if err != nil {
return nil, "", err
}

View File

@ -3,6 +3,7 @@ package writebuffer
import (
"context"
"fmt"
"path"
"sync"
"github.com/cockroachdb/errors"
@ -18,12 +19,15 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -523,6 +527,16 @@ func (wb *writeBufferBase) CreateNewGrowingSegment(partitionID int64, segmentID
State: commonpb.SegmentState_Growing,
StorageVersion: storageVersion,
}
// set manifest path when creating segment
if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() {
k := metautil.JoinIDPath(wb.collectionID, partitionID, segmentID)
basePath := path.Join(paramtable.Get().ServiceParam.MinioCfg.RootPath.GetValue(), common.SegmentInsertLogPath, k)
if paramtable.Get().CommonCfg.StorageType.GetValue() != "local" {
basePath = path.Join(paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue(), basePath)
}
// -1 for first write
segmentInfo.ManifestPath = packed.MarshalManifestPath(basePath, -1)
}
wb.metaCache.AddSegment(segmentInfo, func(_ *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.NewBM25StatsFactory, metacache.SetStartPosRecorded(false))

View File

@ -177,8 +177,6 @@ func (pw *FFIPackedWriter) Close() (string, error) {
defer C.free(unsafe.Pointer(cBasePath))
var transationHandle C.TransactionHandle
// TODO pass version
// use -1 as latest
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(pw.baseVersion))
if err := HandleFFIResult(result); err != nil {
return "", err

View File

@ -928,6 +928,7 @@ message ImportRequest {
int64 task_slot = 14;
int64 storage_version = 15;
repeated common.KeyValuePair plugin_context = 16;
bool use_loon_ffi = 17;
}
message QueryPreImportRequest {
@ -972,6 +973,7 @@ message ImportSegmentInfo {
repeated FieldBinlog statslogs = 4;
repeated FieldBinlog deltalogs = 5;
repeated FieldBinlog bm25logs = 6;
string manifest_path = 7;
}
message QueryImportResponse {

File diff suppressed because it is too large Load Diff