enhance: [StorageV2] add manifest path support for FFI integration (#44991)

Related to #44956

Add manifest_path field throughout the data path to support LOON Storage
V2 manifest tracking. The manifest stores metadata for segment data
files and enables the unified Storage V2 FFI interface.

Changes include:
- Add manifest_path field to SegmentInfo and SaveBinlogPathsRequest
proto messages
- Add UpdateManifest operator to datacoord meta operations
- Update metacache, sync manager, and meta writer to propagate manifest
paths
- Include manifest_path in segment load info for query coordinator

This is part of the Storage V2 FFI interface integration.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-10-27 19:24:10 +08:00 committed by GitHub
parent fd0ef09e97
commit 569a5b40d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 3350 additions and 3268 deletions

View File

@ -1177,6 +1177,23 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint,
}
}
func UpdateManifest(segmentID int64, manifestPath string) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Ctx(context.TODO()).Warn("meta update: update manifest failed - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
// skip empty manifest update and same manifest
if manifestPath == "" || segment.ManifestPath == manifestPath {
return false
}
segment.ManifestPath = manifestPath
return true
}
}
func UpdateImportedRows(segmentID int64, rows int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)

View File

@ -954,6 +954,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 100}}}, true),
UpdateManifest(1, "files/binlogs/1/2/1000/manifest_0"),
)
assert.NoError(t, err)
@ -969,6 +970,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.Equal(t, len(updated.Bm25Statslogs[0].Binlogs), 1)
assert.Equal(t, updated.State, commonpb.SegmentState_Growing)
assert.Equal(t, updated.NumOfRows, int64(10))
assert.Equal(t, updated.ManifestPath, "files/binlogs/1/2/1000/manifest_0")
err = meta.UpdateSegmentsInfo(
context.TODO(),
@ -991,6 +993,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
UpdateStatusOperator(1, commonpb.SegmentState_Flushed),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 12, Position: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}, Timestamp: 101}}}, true),
UpdateManifest(1, "files/binlogs/1/2/1000/manifest_2"),
)
assert.NoError(t, err)
@ -1002,6 +1005,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.Equal(t, len(updated.Deltalogs), 0)
assert.Equal(t, len(updated.Bm25Statslogs), 0)
assert.Equal(t, updated.State, commonpb.SegmentState_Flushed)
assert.Equal(t, updated.ManifestPath, "files/binlogs/1/2/1000/manifest_2")
err = meta.UpdateSegmentsInfo(
context.TODO(),
@ -1124,6 +1128,12 @@ func TestUpdateSegmentsInfo(t *testing.T) {
)
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateManifest(1, "files/binlogs/1/2/1000/manifest_0"),
)
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(context.TODO(), UpdateAsDroppedIfEmptyWhenFlushing(1))
assert.NoError(t, err)
})

View File

@ -687,8 +687,9 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()))
}
// save binlogs, start positions and checkpoints
// save manifest, start positions and checkpoints
operators = append(operators,
UpdateManifest(req.GetSegmentID(), req.GetManifestPath()),
UpdateStartPosition(req.GetStartPositions()),
UpdateAsDroppedIfEmptyWhenFlushing(req.GetSegmentID()),
)

View File

@ -48,6 +48,7 @@ type SegmentInfo struct {
deltalogs []*datapb.FieldBinlog
bm25logs []*datapb.FieldBinlog
currentSplit []storagecommon.ColumnGroup
manifestPath string
}
func (s *SegmentInfo) SegmentID() int64 {
@ -129,6 +130,10 @@ func (s *SegmentInfo) Bm25logs() []*datapb.FieldBinlog {
return s.bm25logs
}
func (s *SegmentInfo) ManifestPath() string {
return s.manifestPath
}
func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,
@ -150,6 +155,7 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
deltalogs: s.deltalogs,
bm25logs: s.bm25logs,
currentSplit: s.currentSplit,
manifestPath: s.manifestPath,
}
}
@ -188,5 +194,6 @@ func NewSegmentInfo(info *datapb.SegmentInfo, bfs pkoracle.PkStat, bm25Stats *Se
deltalogs: info.GetDeltalogs(),
bm25logs: info.GetBm25Statslogs(),
currentSplit: currentSplit,
manifestPath: info.GetManifestPath(),
}
}

View File

@ -92,6 +92,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
zap.Int("statslogNum", lo.SumBy(statsFieldBinlogs, getBinlogNum)),
zap.Int("deltalogNum", lo.SumBy(deltaFieldBinlogs, getBinlogNum)),
zap.Int("bm25logNum", lo.SumBy(deltaBm25StatsBinlogs, getBinlogNum)),
zap.String("manifestPath", pack.manifestPath),
zap.String("vChannelName", pack.channelName),
)
@ -118,6 +119,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
SegLevel: pack.level,
StorageVersion: segment.GetStorageVersion(),
WithFullBinlogs: true,
ManifestPath: pack.manifestPath,
}
err := retry.Handle(ctx, func() (bool, error) {
err := b.broker.SaveBinlogPaths(ctx, req)

View File

@ -75,6 +75,8 @@ type SyncTask struct {
bm25Binlogs map[int64]*datapb.FieldBinlog
deltaBinlog *datapb.FieldBinlog
manifestPath string
writeRetryOpts []retry.Option
failureCallback func(err error)
@ -134,6 +136,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
switch segmentInfo.GetStorageVersion() {
case storage.StorageV2:
// TODO change to return manifest after integrated
// New sync task means needs to flush data immediately, so do not need to buffer data in writer again.
writer := NewBulkPackWriterV2(t.metacache, t.schema, t.chunkManager, t.allocator, 0,
packed.DefaultMultiPartUploadSize, t.storageConfig, columnGroups, t.writeRetryOpts...)

View File

@ -90,6 +90,7 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M
IsSorted: segment.GetIsSorted(),
TextStatsLogs: segment.GetTextStatsLogs(),
JsonKeyStatsLogs: segment.GetJsonKeyStats(),
ManifestPath: segment.GetManifestPath(),
}
return loadInfo
}

View File

@ -418,6 +418,11 @@ message SegmentInfo {
// After the growing segment is full managed by streamingnode, the true value can never be seen at coordinator.
bool is_created_by_streaming = 30;
bool is_partition_key_sorted = 31;
// manifest_path stores the fullpath of LOON manifest file of segemnt data files.
// we could keep the fullpath since one segment shall only have one active manifest
// and we could keep the possiblity that manifest stores out side of collection/partition/segment path
string manifest_path = 32;
}
message SegmentStartPosition {
@ -443,6 +448,7 @@ message SaveBinlogPathsRequest {
int64 storageVersion = 15;
repeated FieldBinlog field2Bm25logPaths = 16;
bool with_full_binlogs = 17; // report with full data for verification.
string manifest_path = 18; //
}
message CheckPoint {

File diff suppressed because it is too large Load Diff

View File

@ -382,6 +382,7 @@ message SegmentLoadInfo {
repeated data.FieldBinlog bm25logs = 21;
map<int64, data.JsonKeyStats> jsonKeyStatsLogs = 22;
common.LoadPriority priority = 23;
string manifest_path = 24;
}
message FieldIndexInfo {

File diff suppressed because it is too large Load Diff