mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: [Storagev2] Close segment readers in mergeSort (#43116)
Related to #43062 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
0017fa8acc
commit
d09764508a
@ -87,6 +87,12 @@ func mergeSortMultipleSegments(ctx context.Context,
|
||||
segmentFilters[i] = compaction.NewEntityFilter(delta, collectionTtl, currentTime)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, r := range segmentReaders {
|
||||
r.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
var predicate func(r storage.Record, ri, i int) bool
|
||||
switch pkField.DataType {
|
||||
case schemapb.DataType_Int64:
|
||||
|
||||
@ -286,10 +286,20 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
|
||||
rb.Append(r, sliceStart, r.Len())
|
||||
}
|
||||
if rb.GetRowNum() > 0 {
|
||||
mWriter.Write(rb.Build())
|
||||
err := func() error {
|
||||
rec := rb.Build()
|
||||
defer rec.Release()
|
||||
return mWriter.Write(rec)
|
||||
}()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mWriter.Write(r)
|
||||
err := mWriter.Write(r)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -38,9 +38,10 @@ func Sort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader
|
||||
}
|
||||
indices := make([]*index, 0)
|
||||
|
||||
// release cgo records
|
||||
defer func() {
|
||||
for _, r := range records {
|
||||
r.Release()
|
||||
for _, rec := range records {
|
||||
rec.Release()
|
||||
}
|
||||
}()
|
||||
|
||||
@ -68,13 +69,6 @@ func Sort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// release cgo records
|
||||
defer func() {
|
||||
for _, rec := range records {
|
||||
rec.Release()
|
||||
}
|
||||
}()
|
||||
|
||||
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -261,6 +255,14 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
}
|
||||
|
||||
rb := NewRecordBuilder(schema)
|
||||
writeRecord := func() error {
|
||||
rec := rb.Build()
|
||||
defer rec.Release()
|
||||
if rec.Len() > 0 {
|
||||
return rw.Write(rec)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for pq.Len() > 0 {
|
||||
idx := pq.Dequeue()
|
||||
@ -269,7 +271,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
// small batch size will cause write performance degradation. To work around this issue, we accumulate
|
||||
// records and write them in batches. This requires additional memory copy.
|
||||
if rb.GetSize() >= batchSize {
|
||||
if err := rw.Write(rb.Build()); err != nil {
|
||||
if err := writeRecord(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
@ -289,7 +291,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
|
||||
|
||||
// write the last batch
|
||||
if rb.GetRowNum() > 0 {
|
||||
if err := rw.Write(rb.Build()); err != nil {
|
||||
if err := writeRecord(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,6 +98,10 @@ func NewPackedReader(filePaths []string, schema *arrow.Schema, bufferSize int64,
|
||||
}
|
||||
|
||||
func (pr *PackedReader) ReadNext() (arrow.Record, error) {
|
||||
if pr.currentBatch != nil {
|
||||
pr.currentBatch.Release()
|
||||
pr.currentBatch = nil
|
||||
}
|
||||
var cArr C.CArrowArray
|
||||
var cSchema C.CArrowSchema
|
||||
status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema)
|
||||
@ -120,6 +124,7 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert ArrowArray to Record: %w", err)
|
||||
}
|
||||
pr.currentBatch = recordBatch
|
||||
|
||||
// Return the RecordBatch as an arrow.Record
|
||||
return recordBatch, nil
|
||||
@ -129,6 +134,9 @@ func (pr *PackedReader) Close() error {
|
||||
if pr.cPackedReader == nil {
|
||||
return nil
|
||||
}
|
||||
if pr.currentBatch != nil {
|
||||
pr.currentBatch.Release()
|
||||
}
|
||||
status := C.CloseReader(pr.cPackedReader)
|
||||
if err := ConsumeCStatusIntoError(&status); err != nil {
|
||||
return err
|
||||
|
||||
@ -36,6 +36,7 @@ type PackedReader struct {
|
||||
cPackedReader C.CPackedReader
|
||||
arr *cdata.CArrowArray
|
||||
schema *arrow.Schema
|
||||
currentBatch arrow.Record
|
||||
}
|
||||
|
||||
type (
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user