enhance: [StorageV2] Release record and close reader (#42983)

Related to #39173

This PR
- Close packed reader after sort
- Release arrow.Record preventing memory leakage
- Invoke `pack_reader->Close()` for CloseReader

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-06-27 14:46:43 +08:00 committed by GitHub
parent 238bd30f42
commit 9b06ecb72f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 9 additions and 0 deletions

View File

@ -138,6 +138,7 @@ CloseReader(CPackedReader c_packed_reader) {
auto packed_reader =
static_cast<milvus_storage::PackedRecordBatchReader*>(
c_packed_reader);
packed_reader->Close();
delete packed_reader;
return milvus::SuccessCStatus();
} catch (std::exception& e) {

View File

@ -242,6 +242,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
log.Warn("error creating insert binlog reader", zap.Error(err))
return nil, err
}
defer rr.Close()
rrs := []storage.RecordReader{rr}
numValidRows, err := storage.Sort(st.req.GetBinlogMaxSize(), st.req.GetSchema(), rrs, srw, predicate)

View File

@ -68,6 +68,13 @@ func Sort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader
return 0, nil
}
// release cgo records
defer func() {
for _, rec := range records {
rec.Release()
}
}()
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return 0, err