mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Fix enqueuing when current batch is fully deleted (#43174)
issue: #43045 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
ed9aa1d4db
commit
8720feeb79
@ -172,6 +172,11 @@ func NewPriorityQueue[T any](less func(x, y *T) bool) *PriorityQueue[T] {
|
||||
func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
rw RecordWriter, predicate func(r Record, ri, i int) bool,
|
||||
) (numRows int, err error) {
|
||||
// Fast path: no readers provided
|
||||
if len(rr) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
type index struct {
|
||||
ri int
|
||||
i int
|
||||
@ -181,10 +186,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
advanceRecord := func(i int) error {
|
||||
rec, err := rr[i].Next()
|
||||
recs[i] = rec // assign nil if err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range rr {
|
||||
@ -235,8 +237,10 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
})
|
||||
}
|
||||
|
||||
enqueueAll := func(ri int) {
|
||||
var enqueueAll func(ri int) error
|
||||
enqueueAll = func(ri int) error {
|
||||
r := recs[ri]
|
||||
hasValid := false
|
||||
for j := 0; j < r.Len(); j++ {
|
||||
if predicate(r, ri, j) {
|
||||
pq.Enqueue(&index{
|
||||
@ -244,13 +248,27 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
i: j,
|
||||
})
|
||||
numRows++
|
||||
hasValid = true
|
||||
}
|
||||
}
|
||||
if !hasValid {
|
||||
err := advanceRecord(ri)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return enqueueAll(ri)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, v := range recs {
|
||||
if v != nil {
|
||||
enqueueAll(i)
|
||||
if err := enqueueAll(i); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -285,7 +303,9 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
enqueueAll(idx.ri)
|
||||
if err := enqueueAll(idx.ri); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -80,11 +80,11 @@ func TestSort(t *testing.T) {
|
||||
|
||||
func TestMergeSort(t *testing.T) {
|
||||
getReaders := func() []RecordReader {
|
||||
blobs, err := generateTestDataWithSeed(10, 3)
|
||||
blobs, err := generateTestDataWithSeed(10000, 3000)
|
||||
assert.NoError(t, err)
|
||||
reader10, err := newCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
|
||||
assert.NoError(t, err)
|
||||
blobs, err = generateTestDataWithSeed(20, 3)
|
||||
blobs, err = generateTestDataWithSeed(20000, 3000)
|
||||
assert.NoError(t, err)
|
||||
reader20, err := newCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
|
||||
assert.NoError(t, err)
|
||||
@ -114,7 +114,7 @@ func TestMergeSort(t *testing.T) {
|
||||
return true
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 6, gotNumRows)
|
||||
assert.Equal(t, 6000, gotNumRows)
|
||||
err = rw.Close()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
@ -122,10 +122,10 @@ func TestMergeSort(t *testing.T) {
|
||||
t.Run("merge sort with predicate", func(t *testing.T) {
|
||||
gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
pk := r.Column(common.RowIDField).(*array.Int64).Value(i)
|
||||
return pk >= 20
|
||||
return pk >= 12000
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, gotNumRows)
|
||||
assert.Equal(t, 4000, gotNumRows)
|
||||
err = rw.Close()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user