From 4369e08f2acf020b47301423da27b451c2017315 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 10 Jan 2022 17:13:35 +0800 Subject: [PATCH] Fix storage memory leak caused by runtime.SetFinalizer (#15100) Signed-off-by: Congqi Xia --- internal/storage/binlog_reader.go | 8 ------- internal/storage/binlog_writer.go | 22 ------------------ internal/storage/event_reader.go | 7 ------ internal/storage/event_writer.go | 37 ------------------------------- 4 files changed, 74 deletions(-) diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go index 1df63f6e3c..6f3f395bef 100644 --- a/internal/storage/binlog_reader.go +++ b/internal/storage/binlog_reader.go @@ -19,9 +19,6 @@ package storage import ( "bytes" "errors" - "runtime" - - "github.com/milvus-io/milvus/internal/log" ) // BinlogReader is an object to read binlog file. Binlog file's format can be @@ -91,10 +88,5 @@ func NewBinlogReader(data []byte) (*BinlogReader, error) { if _, err := reader.readDescriptorEvent(); err != nil { return nil, err } - runtime.SetFinalizer(reader, func(reader *BinlogReader) { - if !reader.isClose { - log.Error("binlog reader is leaking.. please check") - } - }) return reader, nil } diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 1ffa4cbeb4..58eb251a41 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -20,10 +20,8 @@ import ( "bytes" "encoding/binary" "fmt" - "runtime" "github.com/milvus-io/milvus/internal/common" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -276,11 +274,6 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID }, } - runtime.SetFinalizer(w, func(writer *InsertBinlogWriter) { - if !w.isClosed() { - log.Error("insert binlog writer is leaking.. please check") - } - }) return w } @@ -300,11 +293,6 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID buffer: nil, }, } - runtime.SetFinalizer(w, func(writer *DeleteBinlogWriter) { - if !w.isClosed() { - log.Error("delete binlog writer is leaking.. please check") - } - }) return w } @@ -322,11 +310,6 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinl buffer: nil, }, } - runtime.SetFinalizer(w, func(writer *DDLBinlogWriter) { - if !w.isClosed() { - log.Error("ddl binlog writer is leaking.. please check") - } - }) return w } @@ -362,10 +345,5 @@ func NewIndexFileBinlogWriter( buffer: nil, }, } - runtime.SetFinalizer(w, func(writer *IndexFileBinlogWriter) { - if !w.isClosed() { - log.Error("index file binlog writer is leaking.. please check") - } - }) return w } diff --git a/internal/storage/event_reader.go b/internal/storage/event_reader.go index 282d4965c1..518bd4a684 100644 --- a/internal/storage/event_reader.go +++ b/internal/storage/event_reader.go @@ -19,9 +19,7 @@ package storage import ( "bytes" "fmt" - "runtime" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -110,10 +108,5 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea return nil, err } reader.PayloadReaderInterface = payloadReader - runtime.SetFinalizer(reader, func(reader *EventReader) { - if !reader.isClosed { - log.Error("event reader is leaking.. please check") - } - }) return reader, nil } diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 745a6bd1d8..43255adf0d 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -22,10 +22,8 @@ import ( "errors" "fmt" "io" - "runtime" "github.com/milvus-io/milvus/internal/common" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -259,11 +257,6 @@ func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error } writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *insertEventWriter) { - if !writer.isClosed { - log.Error("insert event binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -286,11 +279,6 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error } writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *deleteEventWriter) { - if !writer.isClosed { - log.Error("delete event binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -317,11 +305,6 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti } writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *createCollectionEventWriter) { - if !writer.isClosed { - log.Error("create collection event binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -348,11 +331,6 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv } writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *dropCollectionEventWriter) { - if !writer.isClosed { - log.Error("drop collection event binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -379,11 +357,6 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition } writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *createPartitionEventWriter) { - if !writer.isClosed { - log.Error("create partition binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -410,11 +383,6 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven } writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *dropPartitionEventWriter) { - if !writer.isClosed { - log.Error("drop partition event binlog writer is leaking.. please check") - } - }) return writer, nil } @@ -437,11 +405,6 @@ func newIndexFileEventWriter() (*indexFileEventWriter, error) { } writer.baseEventWriter.getEventDataSize = writer.indexFileEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.indexFileEventData.WriteEventData - runtime.SetFinalizer(writer, func(writer *indexFileEventWriter) { - if !writer.isClosed { - log.Error("index file event binlog writer is leaking.. please check") - } - }) return writer, nil }