diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index 2d4908f3ad..b9355ba541 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -81,7 +81,6 @@ func TestIndexNode(t *testing.T) { t.Run("CreateIndex FloatVector", func(t *testing.T) { var insertCodec storage.InsertCodec - defer insertCodec.Close() insertCodec.Schema = &etcdpb.CollectionMeta{ ID: collectionID, @@ -197,7 +196,6 @@ func TestIndexNode(t *testing.T) { }) t.Run("CreateIndex BinaryVector", func(t *testing.T) { var insertCodec storage.InsertCodec - defer insertCodec.Close() insertCodec.Schema = &etcdpb.CollectionMeta{ ID: collectionID, @@ -310,7 +308,6 @@ func TestIndexNode(t *testing.T) { t.Run("Create Deleted_Index", func(t *testing.T) { var insertCodec storage.InsertCodec - defer insertCodec.Close() insertCodec.Schema = &etcdpb.CollectionMeta{ ID: collectionID, @@ -489,7 +486,6 @@ func TestCreateIndexFailed(t *testing.T) { t.Run("CreateIndex error", func(t *testing.T) { var insertCodec storage.InsertCodec - defer insertCodec.Close() insertCodec.Schema = &etcdpb.CollectionMeta{ ID: collectionID, @@ -608,7 +604,6 @@ func TestCreateIndexFailed(t *testing.T) { t.Run("Invalid Param", func(t *testing.T) { var insertCodec storage.InsertCodec - defer insertCodec.Close() insertCodec.Schema = &etcdpb.CollectionMeta{ ID: collectionID, diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index ef260b20b1..148c8cbb5c 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -348,7 +348,6 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { storageBlobs := getStorageBlobs(blobs) var insertCodec storage.InsertCodec - defer insertCodec.Close() collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(storageBlobs) if err2 != nil { return err2 @@ -407,7 +406,6 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { if err != nil { return err } - _ = codec.Close() tr.Record("serialize index codec done") getSavePathByKey := func(key string) string { diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 0256813eda..3c52d71b60 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -108,7 +108,6 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa var indexParams indexParam var indexName string indexCodec := storage.NewIndexFileBinlogCodec() - defer indexCodec.Close() for _, p := range indexPath { log.Debug("", zap.String("load path", fmt.Sprintln(indexPath))) indexPiece, err := loader.kv.Load(p) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 17f262c1c0..e1ddb06c26 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -225,12 +225,6 @@ func (loader *segmentLoader) filterFieldBinlogs(fieldBinlogs []*datapb.FieldBinl func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlogs []*datapb.FieldBinlog, segmentType segmentType) error { iCodec := storage.InsertCodec{} - defer func() { - err := iCodec.Close() - if err != nil { - log.Warn(err.Error()) - } - }() blobs := make([]*storage.Blob, 0) for _, fb := range fieldBinlogs { log.Debug("load segment fields data", diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go index c35c3a4b4e..d0612d2d0c 100644 --- a/internal/storage/binlog_iterator.go +++ b/internal/storage/binlog_iterator.go @@ -58,7 +58,6 @@ func NewInsertBinlogIterator(blobs []*Blob, PKfieldID UniqueID) (*InsertBinlogIt reader := NewInsertCodec(nil) _, _, serData, err := reader.Deserialize(blobs) - defer reader.Close() if err != nil { return nil, err diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index c760847b96..0774bd2b36 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -28,7 +28,6 @@ func generateTestData(t *testing.T, num int) []*Blob { {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, }} insertCodec := NewInsertCodec(&etcdpb.CollectionMeta{ID: 1, Schema: schema}) - defer insertCodec.Close() data := &InsertData{Data: map[FieldID]FieldData{rootcoord.TimeStampField: &Int64FieldData{Data: []int64{}}, rootcoord.RowIDField: &Int64FieldData{Data: []int64{}}, 101: &Int32FieldData{Data: []int32{}}}} for i := 1; i <= num; i++ { diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 96e7680104..e59473f7a3 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -268,8 +268,7 @@ type InsertData struct { // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} type InsertCodec struct { - Schema *etcdpb.CollectionMeta - readerCloseFunc []func() error + Schema *etcdpb.CollectionMeta } func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec { @@ -397,9 +396,6 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( if len(blobs) == 0 { return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") } - readerClose := func(reader *BinlogReader) func() error { - return func() error { return reader.Close() } - } var blobList BlobList = blobs sort.Sort(blobList) @@ -607,6 +603,10 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( default: return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType) } + err = eventReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } } if fieldID == rootcoord.TimeStampField { blobInfo := BlobInfo{ @@ -614,7 +614,10 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( } resultData.Infos = append(resultData.Infos, blobInfo) } - insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader)) + err = binlogReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } } return cID, pID, sID, resultData, nil @@ -629,16 +632,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return partitionID, segmentID, data, err } -func (insertCodec *InsertCodec) Close() error { - for _, closeFunc := range insertCodec.readerCloseFunc { - err := closeFunc() - if err != nil { - return err - } - } - return nil -} - // DeleteData saves each entity delete message represented as map. // timestamp represents the time when this instance was deleted type DeleteData struct { @@ -656,7 +649,6 @@ func (data *DeleteData) Append(pk UniqueID, ts Timestamp) { // DeleteCodec serializes and deserializes the delete data type DeleteCodec struct { - readerCloseFunc []func() error } // NewDeleteCodec returns a DeleteCodec @@ -724,9 +716,6 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID if len(blobs) == 0 { return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") } - readerClose := func(reader *BinlogReader) func() error { - return func() error { return reader.Close() } - } var pid, sid UniqueID result := &DeleteData{} @@ -771,8 +760,14 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID result.Pks = append(result.Pks, pk) result.Tss = append(result.Tss, ts) } - - deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader)) + err = eventReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } + err = binlogReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } } result.RowCount = int64(len(result.Pks)) @@ -784,8 +779,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} // ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} type DataDefinitionCodec struct { - collectionID int64 - readerCloseFunc []func() error + collectionID int64 } func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec { @@ -912,9 +906,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ if len(blobs) == 0 { return nil, nil, fmt.Errorf("blobs is empty") } - readerClose := func(reader *BinlogReader) func() error { - return func() error { return reader.Close() } - } var requestsStrings []string var resultTs []Timestamp @@ -958,32 +949,26 @@ func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts [ requestsStrings = append(requestsStrings, singleString) } } + err = eventReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } + } + err = binlogReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) } - dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader)) } return resultTs, requestsStrings, nil } -func (dataDefinitionCodec *DataDefinitionCodec) Close() error { - for _, closeFunc := range dataDefinitionCodec.readerCloseFunc { - err := closeFunc() - if err != nil { - return err - } - } - return nil -} - type IndexFileBinlogCodec struct { - readerCloseFuncs []func() error } func NewIndexFileBinlogCodec() *IndexFileBinlogCodec { - return &IndexFileBinlogCodec{ - readerCloseFuncs: make([]func() error, 0), - } + return &IndexFileBinlogCodec{} } func (codec *IndexFileBinlogCodec) Serialize( @@ -1118,10 +1103,6 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) ( if len(blobs) == 0 { return 0, 0, 0, 0, 0, 0, nil, "", 0, nil, errors.New("blobs is empty") } - readerClose := func(reader *BinlogReader) func() error { - return func() error { return reader.Close() } - } - indexParams = make(map[string]string) datas = make([]*Blob, 0) @@ -1205,9 +1186,16 @@ func (codec *IndexFileBinlogCodec) DeserializeImpl(blobs []*Blob) ( }) } } + err = eventReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) + } + } + err = binlogReader.Close() + if err != nil { + log.Warn("event reader close failed", zap.Error(err)) } - codec.readerCloseFuncs = append(codec.readerCloseFuncs, readerClose(binlogReader)) } return indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas, nil @@ -1224,16 +1212,6 @@ func (codec *IndexFileBinlogCodec) Deserialize(blobs []*Blob) ( return datas, indexParams, indexName, indexID, err } -func (codec *IndexFileBinlogCodec) Close() error { - for _, closeFunc := range codec.readerCloseFuncs { - err := closeFunc() - if err != nil { - return err - } - } - return nil -} - type IndexCodec struct { } diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 957a201fe9..52523e0d2e 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -293,7 +293,6 @@ func TestInsertCodec(t *testing.T) { assert.Equal(t, []string{"1", "2", "3", "4"}, resultData.Data[StringField].(*StringFieldData).Data) assert.Equal(t, []byte{0, 255, 0, 255}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).Data) assert.Equal(t, []float32{0, 1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 4, 5, 6, 7}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).Data) - assert.Nil(t, insertCodec.Close()) log.Debug("Data", zap.Any("Data", resultData.Data)) log.Debug("Infos", zap.Any("Infos", resultData.Infos)) @@ -353,7 +352,6 @@ func TestDDCodec(t *testing.T) { assert.Nil(t, err) assert.Equal(t, resultTs, ts) assert.Equal(t, resultRequests, ddRequests) - assert.Nil(t, dataDefinitionCodec.Close()) blobs = []*Blob{} _, _, err = dataDefinitionCodec.Deserialize(blobs) @@ -416,9 +414,6 @@ func TestIndexFileBinlogCodec(t *testing.T) { assert.Equal(t, indexName, idxName) assert.Equal(t, indexID, idxID) - err = codec.Close() - assert.Nil(t, err) - // empty _, _, _, _, _, _, _, _, _, _, err = codec.DeserializeImpl(nil) assert.NotNil(t, err) diff --git a/internal/storage/print_binlog_test.go b/internal/storage/print_binlog_test.go index 5ee08d5aa4..955cf19f53 100644 --- a/internal/storage/print_binlog_test.go +++ b/internal/storage/print_binlog_test.go @@ -425,7 +425,6 @@ func TestPrintDDFiles(t *testing.T) { assert.Nil(t, err) assert.Equal(t, resultTs, ts) assert.Equal(t, resultRequests, ddRequests) - assert.Nil(t, dataDefinitionCodec.Close()) PrintBinlogFiles(binlogFiles) } diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index 7f6d6fd310..3ebf67dfca 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -119,7 +119,6 @@ func TestGetBinlogSize(t *testing.T) { } codec := NewIndexFileBinlogCodec() - defer codec.Close() serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas) assert.Nil(t, err) @@ -194,7 +193,6 @@ func TestEstimateMemorySize(t *testing.T) { } codec := NewIndexFileBinlogCodec() - defer codec.Close() serializedBlobs, err := codec.Serialize(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexParams, indexName, indexID, datas) assert.Nil(t, err) diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index f13789d3b2..98352644ab 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -63,7 +63,6 @@ func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) { if err != nil { return nil, err } - defer insertCodec.Close() var results []byte for _, singleData := range data.Data {