From 8720feeb79749cfc7929134b17587d3dac4c8fb6 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 8 Jul 2025 12:20:46 +0800 Subject: [PATCH] fix: Fix enqueuing when current batch is fully deleted (#43174) issue: #43045 --------- Signed-off-by: Cai Zhang --- internal/storage/sort.go | 34 +++++++++++++++++++++++++++------- internal/storage/sort_test.go | 10 +++++----- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/internal/storage/sort.go b/internal/storage/sort.go index 2d571c6e8c..36dfd99cb1 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -172,6 +172,11 @@ func NewPriorityQueue[T any](less func(x, y *T) bool) *PriorityQueue[T] { func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader, rw RecordWriter, predicate func(r Record, ri, i int) bool, ) (numRows int, err error) { + // Fast path: no readers provided + if len(rr) == 0 { + return 0, nil + } + type index struct { ri int i int @@ -181,10 +186,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR advanceRecord := func(i int) error { rec, err := rr[i].Next() recs[i] = rec // assign nil if err - if err != nil { - return err - } - return nil + return err } for i := range rr { @@ -235,8 +237,10 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR }) } - enqueueAll := func(ri int) { + var enqueueAll func(ri int) error + enqueueAll = func(ri int) error { r := recs[ri] + hasValid := false for j := 0; j < r.Len(); j++ { if predicate(r, ri, j) { pq.Enqueue(&index{ @@ -244,13 +248,27 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR i: j, }) numRows++ + hasValid = true } } + if !hasValid { + err := advanceRecord(ri) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + return enqueueAll(ri) + } + return nil } for i, v := range recs { if v != nil { - enqueueAll(i) + if err := enqueueAll(i); err != nil { + return 0, err + } } } @@ -285,7 +303,9 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR if err != nil { return 0, err } - enqueueAll(idx.ri) + if err := enqueueAll(idx.ri); err != nil { + return 0, err + } } } diff --git a/internal/storage/sort_test.go b/internal/storage/sort_test.go index 77689a82ed..d524bcb2eb 100644 --- a/internal/storage/sort_test.go +++ b/internal/storage/sort_test.go @@ -80,11 +80,11 @@ func TestSort(t *testing.T) { func TestMergeSort(t *testing.T) { getReaders := func() []RecordReader { - blobs, err := generateTestDataWithSeed(10, 3) + blobs, err := generateTestDataWithSeed(10000, 3000) assert.NoError(t, err) reader10, err := newCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) - blobs, err = generateTestDataWithSeed(20, 3) + blobs, err = generateTestDataWithSeed(20000, 3000) assert.NoError(t, err) reader20, err := newCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) @@ -114,7 +114,7 @@ func TestMergeSort(t *testing.T) { return true }) assert.NoError(t, err) - assert.Equal(t, 6, gotNumRows) + assert.Equal(t, 6000, gotNumRows) err = rw.Close() assert.NoError(t, err) }) @@ -122,10 +122,10 @@ func TestMergeSort(t *testing.T) { t.Run("merge sort with predicate", func(t *testing.T) { gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { pk := r.Column(common.RowIDField).(*array.Int64).Value(i) - return pk >= 20 + return pk >= 12000 }) assert.NoError(t, err) - assert.Equal(t, 3, gotNumRows) + assert.Equal(t, 4000, gotNumRows) err = rw.Close() assert.NoError(t, err) })