From b4682b73529221b27995e0dbbef61174e4e7369f Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 30 Dec 2025 14:05:21 +0800 Subject: [PATCH] fix: use LoadDeltaData instead of Delete for L0 growing forward (#46657) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related to #46660 Replace segment.Delete() with segment.LoadDeltaData() when forwarding L0 deletions to growing segments. LoadDeltaData is the more appropriate API for bulk loading delta data compared to individual Delete calls. • Core invariant: forwarding L0 deletions to growing segments must use the bulk-delta API (storage.DeltaData + segment.LoadDeltaData) because LoadDeltaData preserves paired primary keys and timestamps as a single atomic delta payload; segment.Delete was intended for per-delete RPCs and not for loading L0 delta payloads. • Logic removed/simplified: addL0GrowingBF() no longer calls segment.Delete for buffered L0 keys. Instead the buffered callback builds a storage.DeltaData via storage.NewDeltaDataWithData(pks, tss) and calls segment.LoadDeltaData(ctx, dd). This eliminates the previous per-batch Delete call path and centralizes forwarding as a single delta-load operation. • Why this does not cause data loss or regression: the new path supplies identical PK+timestamp pairs to the segment via DeltaData; LoadDeltaData applies the same delete semantics but accepts batched delta payloads. The change is limited to the L0→growing Bloom-Filter forward path (addL0GrowingBF/addL0ForGrowingLoad), leaving sealed-segment deletes, streaming direct forwarding, and remote-load policies unchanged. Also, the prior code would fail on L0Segment.Delete (L0 segments prohibit Delete), so switching to LoadDeltaData prevents lost-forwarding caused by unsupported Delete calls. • Category: Enhancement / Refactor — replaces inappropriate per-delete calls with the correct bulk delta-load API, simplifying error handling around NewDeltaDataWithData and ensuring API contract correctness for L0→growing forwarding. --------- Signed-off-by: Congqi Xia --- internal/querynodev2/delegator/delta_forward.go | 6 +++++- internal/querynodev2/delegator/delta_forward_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index 80fee34523..ed09ce978c 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -101,7 +101,11 @@ func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments. func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error { bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(), func(pks storage.PrimaryKeys, tss []uint64) error { - return segment.Delete(ctx, pks, tss) + dd, err := storage.NewDeltaDataWithData(pks, tss) + if err != nil { + return err + } + return segment.LoadDeltaData(ctx, dd) }) if err := sd.rangeHitL0Deletions(segment.Partition(), segment, func(pk storage.PrimaryKey, ts uint64) error { diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index a92407c8ab..e1a59bdfeb 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -425,16 +425,16 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() { seg.EXPECT().ID().Return(1) seg.EXPECT().Partition().Return(100) seg.EXPECT().BatchPkExist(mock.Anything).Return(lo.RepeatBy(n, func(i int) bool { return true })) - seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error { - s.Equal(deltaData.DeletePks(), pk) - s.Equal(deltaData.DeleteTimestamps(), u) + seg.EXPECT().LoadDeltaData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dd *storage.DeltaData) error { + s.Equal(deltaData.DeletePks(), dd.DeletePks()) + s.Equal(deltaData.DeleteTimestamps(), dd.DeleteTimestamps()) return nil }).Once() err = sd.addL0ForGrowing(context.Background(), seg) s.NoError(err) - seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error { + seg.EXPECT().LoadDeltaData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dd *storage.DeltaData) error { return errors.New("mocked") }).Once() err = sd.addL0ForGrowing(context.Background(), seg)