From 18fbaaca0ac3fbfc3cd85c29c3feaf04415b433f Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 15 Dec 2025 19:49:14 +0800 Subject: [PATCH] enhance: support specified version manifest write (#46331) Related to #44956 **Support specified version manifest write** - Add `baseVersion` parameter to `NewPackedRecordManifestWriter` and `NewFFIPackedWriter` to support writing manifest based on a specific version instead of always overwriting the latest - Add `manifestPath` tracking in `BulkPackWriterV2` to maintain manifest state across writes - Add `GetManifestInfo` method to parse existing manifest path and extract base path and version - Add `UpdateManifestPath` metacache action to track manifest path in segment info - Update `transaction_begin` FFI call to use the specified base version --------- Signed-off-by: Congqi Xia --- .../clustering_compactor_storage_v2_test.go | 2 +- .../mix_compactor_storage_v2_test.go | 2 +- .../compactor/namespace_compactor_test.go | 2 +- internal/flushcommon/metacache/actions.go | 6 ++++ .../flushcommon/syncmgr/pack_writer_v2.go | 36 ++++++++++++++++--- .../syncmgr/pack_writer_v2_test.go | 10 +++--- internal/flushcommon/syncmgr/task.go | 5 ++- internal/storage/binlog_record_writer.go | 5 ++- internal/storage/record_writer.go | 13 ++----- .../storagev2/packed/packed_writer_ffi.go | 5 +-- internal/storagev2/packed/type.go | 1 + 11 files changed, 58 insertions(+), 29 deletions(-) diff --git a/internal/datanode/compactor/clustering_compactor_storage_v2_test.go b/internal/datanode/compactor/clustering_compactor_storage_v2_test.go index 4cee0b77e4..4744c3149e 100644 --- a/internal/datanode/compactor/clustering_compactor_storage_v2_test.go +++ b/internal/datanode/compactor/clustering_compactor_storage_v2_test.go @@ -290,7 +290,7 @@ func (s *ClusteringCompactionTaskStorageV2Suite) initStorageV2Segments(rows int, bw := syncmgr.NewBulkPackWriterV2(mc, sch, cm, s.mockAlloc, packed.DefaultWriteBufferSize, 0, &indexpb.StorageConfig{ StorageType: "local", RootPath: rootPath, - }, columnGroups) + }, columnGroups, "") return bw.Write(context.Background(), pack) } diff --git a/internal/datanode/compactor/mix_compactor_storage_v2_test.go b/internal/datanode/compactor/mix_compactor_storage_v2_test.go index d733e130db..604399e006 100644 --- a/internal/datanode/compactor/mix_compactor_storage_v2_test.go +++ b/internal/datanode/compactor/mix_compactor_storage_v2_test.go @@ -333,7 +333,7 @@ func (s *MixCompactionTaskStorageV2Suite) initStorageV2Segments(rows int, seed i bw := syncmgr.NewBulkPackWriterV2(mc, s.meta.Schema, cm, alloc, packed.DefaultWriteBufferSize, 0, &indexpb.StorageConfig{ StorageType: "local", RootPath: rootPath, - }, columnGroups) + }, columnGroups, "") return bw.Write(context.Background(), pack) } diff --git a/internal/datanode/compactor/namespace_compactor_test.go b/internal/datanode/compactor/namespace_compactor_test.go index ce53c285e8..5189606960 100644 --- a/internal/datanode/compactor/namespace_compactor_test.go +++ b/internal/datanode/compactor/namespace_compactor_test.go @@ -111,7 +111,7 @@ func (s *NamespaceCompactorTestSuite) setupSortedSegments() { bw := syncmgr.NewBulkPackWriterV2(mc, s.schema, cm, alloc, packed.DefaultWriteBufferSize, 0, &indexpb.StorageConfig{ StorageType: "local", RootPath: rootPath, - }, columnGroups) + }, columnGroups, "") inserts, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.Require().NoError(err) s.sortedSegments = append(s.sortedSegments, &datapb.CompactionSegmentBinlogs{ diff --git a/internal/flushcommon/metacache/actions.go b/internal/flushcommon/metacache/actions.go index 6c346dc273..7cc4c5a3bb 100644 --- a/internal/flushcommon/metacache/actions.go +++ b/internal/flushcommon/metacache/actions.go @@ -243,6 +243,12 @@ func SetStartPosRecorded(flag bool) SegmentAction { } } +func UpdateManifestPath(manifestPath string) SegmentAction { + return func(info *SegmentInfo) { + info.manifestPath = manifestPath + } +} + // MergeSegmentAction is the util function to merge multiple SegmentActions into one. func MergeSegmentAction(actions ...SegmentAction) SegmentAction { return func(info *SegmentInfo) { diff --git a/internal/flushcommon/syncmgr/pack_writer_v2.go b/internal/flushcommon/syncmgr/pack_writer_v2.go index d7fb9e0eab..8ad867d979 100644 --- a/internal/flushcommon/syncmgr/pack_writer_v2.go +++ b/internal/flushcommon/syncmgr/pack_writer_v2.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storagecommon" + "github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" @@ -51,11 +52,12 @@ type BulkPackWriterV2 struct { storageConfig *indexpb.StorageConfig columnGroups []storagecommon.ColumnGroup + manifestPath string } func NewBulkPackWriterV2(metaCache metacache.MetaCache, schema *schemapb.CollectionSchema, chunkManager storage.ChunkManager, allocator allocator.Interface, bufferSize, multiPartUploadSize int64, - storageConfig *indexpb.StorageConfig, columnGroups []storagecommon.ColumnGroup, writeRetryOpts ...retry.Option, + storageConfig *indexpb.StorageConfig, columnGroups []storagecommon.ColumnGroup, curManifestPath string, writeRetryOpts ...retry.Option, ) *BulkPackWriterV2 { return &BulkPackWriterV2{ BulkPackWriter: &BulkPackWriter{ @@ -70,6 +72,7 @@ func NewBulkPackWriterV2(metaCache metacache.MetaCache, schema *schemapb.Collect multiPartUploadSize: multiPartUploadSize, storageConfig: storageConfig, columnGroups: columnGroups, + manifestPath: curManifestPath, } } @@ -179,6 +182,27 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m return logs, manifestPath, nil } +func (bw *BulkPackWriterV2) GetManifestInfo(pack *SyncPack) (basePath string, version int64, err error) { + // empty info, shall be first write, + // initialize manifestPath with -1 version + if bw.manifestPath == "" { + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID) + logicalPath := path.Join(bw.getRootPath(), common.SegmentInsertLogPath, k) + bucketName := bw.getBucketName() + // if storage config is not passed, use common config + storageType := paramtable.Get().CommonCfg.StorageType.GetValue() + if bw.storageConfig != nil { + storageType = bw.storageConfig.GetStorageType() + } + if storageType != "local" { + basePath = path.Join(bucketName, logicalPath) + } + return basePath, -1, nil + } + + return packed.UnmarshalManfestPath(bw.manifestPath) +} + func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context, pluginContextPtr *indexcgopb.StoragePluginContext, pack *SyncPack, @@ -203,10 +227,12 @@ func (bw *BulkPackWriterV2) writeInsertsIntoStorage(_ context.Context, } var manifestPath string - if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() { - k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID) - basePath := path.Join(bw.getRootPath(), common.SegmentInsertLogPath, k) - w, err := storage.NewPackedRecordManifestWriter(bucketName, basePath, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups, bw.storageConfig, pluginContextPtr) + if paramtable.Get().CommonCfg.UseLoonFFI.GetAsBool() || bw.manifestPath != "" { + basePath, version, err := bw.GetManifestInfo(pack) + if err != nil { + return nil, "", err + } + w, err := storage.NewPackedRecordManifestWriter(bucketName, basePath, version, bw.schema, bw.bufferSize, bw.multiPartUploadSize, columnGroups, bw.storageConfig, pluginContextPtr) if err != nil { return nil, "", err } diff --git a/internal/flushcommon/syncmgr/pack_writer_v2_test.go b/internal/flushcommon/syncmgr/pack_writer_v2_test.go index 62f5509350..78f82cc005 100644 --- a/internal/flushcommon/syncmgr/pack_writer_v2_test.go +++ b/internal/flushcommon/syncmgr/pack_writer_v2_test.go @@ -143,7 +143,7 @@ func (s *PackWriterV2Suite) TestPackWriterV2_Write() { pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData(genInsertData(rows, s.schema)).WithDeleteData(deletes) - bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit) + bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit, "") gotInserts, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.NoError(err) @@ -162,7 +162,7 @@ func (s *PackWriterV2Suite) TestWriteEmptyInsertData() { mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe() pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName) - bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit) + bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit, "") _, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.NoError(err) @@ -191,7 +191,7 @@ func (s *PackWriterV2Suite) TestNoPkField() { buf.Append(data) pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData([]*storage.InsertData{buf}) - bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit) + bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit, "") _, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.Error(err) @@ -208,7 +208,7 @@ func (s *PackWriterV2Suite) TestAllocIDExhausedError() { mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe() pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData(genInsertData(rows, s.schema)) - bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit) + bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit, "") _, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.Error(err) @@ -229,7 +229,7 @@ func (s *PackWriterV2Suite) TestWriteInsertDataError() { buf.Append(data) pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData([]*storage.InsertData{buf}) - bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit) + bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil, s.currentSplit, "") _, _, _, _, _, _, err := bw.Write(context.Background(), pack) s.Error(err) diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 3a579ea4e3..3dc1fa7fc3 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -136,10 +136,9 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { switch segmentInfo.GetStorageVersion() { case storage.StorageV2: - // TODO change to return manifest after integrated // New sync task means needs to flush data immediately, so do not need to buffer data in writer again. writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager, t.allocator, 0, - packed.DefaultMultiPartUploadSize, t.storageConfig, columnGroups, t.writeRetryOpts...) + packed.DefaultMultiPartUploadSize, t.storageConfig, columnGroups, segmentInfo.ManifestPath(), t.writeRetryOpts...) t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.manifestPath, t.flushedSize, err = writer.Write(ctx, t.pack) if err != nil { log.Warn("failed to write sync data with storage v2 format", zap.Error(err)) @@ -181,7 +180,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { t.pack.ReleaseData() - actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)} + actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows), metacache.UpdateManifestPath(t.manifestPath)} if columnGroups != nil { actions = append(actions, metacache.UpdateCurrentSplit(columnGroups)) } diff --git a/internal/storage/binlog_record_writer.go b/internal/storage/binlog_record_writer.go index 2bdafb6962..a35d01dbb9 100644 --- a/internal/storage/binlog_record_writer.go +++ b/internal/storage/binlog_record_writer.go @@ -361,7 +361,10 @@ func (pw *PackedManifestRecordWriter) initWriters(r Record) error { var err error k := metautil.JoinIDPath(pw.collectionID, pw.partitionID, pw.segmentID) basePath := path.Join(pw.storageConfig.GetRootPath(), common.SegmentInsertLogPath, k) - pw.writer, err = NewPackedRecordManifestWriter(pw.storageConfig.GetBucketName(), basePath, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups, pw.storageConfig, pw.storagePluginContext) + if pw.storageConfig.StorageType != "local" { + basePath = path.Join(pw.storageConfig.GetBucketName(), basePath) + } + pw.writer, err = NewPackedRecordManifestWriter(pw.storageConfig.GetBucketName(), basePath, -1, pw.schema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups, pw.storageConfig, pw.storagePluginContext) if err != nil { return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error())) } diff --git a/internal/storage/record_writer.go b/internal/storage/record_writer.go index 1a5886899b..a6a630ed0d 100644 --- a/internal/storage/record_writer.go +++ b/internal/storage/record_writer.go @@ -296,6 +296,7 @@ func (pw *packedRecordManifestWriter) Close() error { func NewPackedRecordManifestWriter( bucketName string, basePath string, + baseVersion int64, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, @@ -314,16 +315,8 @@ func NewPackedRecordManifestWriter( return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - // if storage config is not passed, use common config - storageType := paramtable.Get().CommonCfg.StorageType.GetValue() - if storageConfig != nil { - storageType = storageConfig.GetStorageType() - } - ffiBasePath := basePath - if storageType != "local" { - ffiBasePath = path.Join(bucketName, basePath) - } - writer, err := packed.NewFFIPackedWriter(ffiBasePath, arrowSchema, columnGroups, storageConfig, storagePluginContext) + + writer, err := packed.NewFFIPackedWriter(basePath, baseVersion, arrowSchema, columnGroups, storageConfig, storagePluginContext) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) diff --git a/internal/storagev2/packed/packed_writer_ffi.go b/internal/storagev2/packed/packed_writer_ffi.go index 2ac06e2ac2..bf390dd03c 100644 --- a/internal/storagev2/packed/packed_writer_ffi.go +++ b/internal/storagev2/packed/packed_writer_ffi.go @@ -74,7 +74,7 @@ func createStorageConfig() *indexpb.StorageConfig { return storageConfig } -func NewFFIPackedWriter(basePath string, schema *arrow.Schema, columnGroups []storagecommon.ColumnGroup, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedWriter, error) { +func NewFFIPackedWriter(basePath string, baseVersion int64, schema *arrow.Schema, columnGroups []storagecommon.ColumnGroup, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedWriter, error) { cBasePath := C.CString(basePath) defer C.free(unsafe.Pointer(cBasePath)) @@ -143,6 +143,7 @@ func NewFFIPackedWriter(basePath string, schema *arrow.Schema, columnGroups []st return &FFIPackedWriter{ basePath: basePath, + baseVersion: baseVersion, cWriterHandle: writerHandle, cProperties: cProperties, }, nil @@ -178,7 +179,7 @@ func (pw *FFIPackedWriter) Close() (string, error) { // TODO pass version // use -1 as latest - result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1)) + result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(pw.baseVersion)) if err := HandleFFIResult(result); err != nil { return "", err } diff --git a/internal/storagev2/packed/type.go b/internal/storagev2/packed/type.go index 0c54b1a810..d33fb1a7fc 100644 --- a/internal/storagev2/packed/type.go +++ b/internal/storagev2/packed/type.go @@ -36,6 +36,7 @@ type PackedWriter struct { type FFIPackedWriter struct { basePath string + baseVersion int64 cWriterHandle C.WriterHandle cProperties *C.Properties }