From 9585819154ba4c1ccc66ecb69db2d5ae0ea52fb4 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 10 Dec 2020 15:50:09 +0800 Subject: [PATCH] Add binlog unittest Signed-off-by: neza2017 --- internal/storage/binlog_writer.go | 227 +++++++++---------------- internal/storage/binlog_writer_test.go | 6 +- internal/storage/data_codec.go | 15 +- internal/storage/event_test.go | 32 ++-- internal/storage/event_writer.go | 23 ++- internal/storage/event_writer_test.go | 4 +- 6 files changed, 128 insertions(+), 179 deletions(-) diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 913465d766..0c0adb0ecb 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -3,8 +3,8 @@ package storage import ( "bytes" "encoding/binary" - "errors" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -29,60 +29,35 @@ const ( type baseBinlogWriter struct { descriptorEvent - magicNumber int32 - binlogType BinlogType - eventWriters []EventWriter - currentEventWriter EventWriter - buffer *bytes.Buffer - numEvents int32 - numRows int32 - isClose bool - offset int32 + magicNumber int32 + binlogType BinlogType + eventWriters []EventWriter + buffer *bytes.Buffer + length int32 } -func (writer *baseBinlogWriter) checkClose() error { - if writer.isClose { - return errors.New("insert binlog writer is already closed") - } - return nil -} - -func (writer *baseBinlogWriter) appendEventWriter() error { - if writer.currentEventWriter != nil { - if err := writer.currentEventWriter.Finish(); err != nil { - return err - } - - writer.eventWriters = append(writer.eventWriters, writer.currentEventWriter) - length, err := writer.currentEventWriter.GetMemoryUsageInBytes() - if err != nil { - return err - } - writer.offset += length - writer.numEvents++ - nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter() - if err != nil { - return err - } - writer.numRows += int32(nums) - writer.currentEventWriter = nil - } - return nil +func (writer *baseBinlogWriter) isClosed() bool { + return writer.buffer != nil } func (writer *baseBinlogWriter) GetEventNums() int32 { - return writer.numEvents + return int32(len(writer.eventWriters)) } func (writer *baseBinlogWriter) GetRowNums() (int32, error) { - var res = writer.numRows - if writer.currentEventWriter != nil { - nums, err := writer.currentEventWriter.GetPayloadLengthFromWriter() - if err != nil { - } - res += int32(nums) + if writer.isClosed() { + return writer.length, nil } - return res, nil + + length := 0 + for _, e := range writer.eventWriters { + rows, err := e.GetPayloadLengthFromWriter() + if err != nil { + return 0, err + } + length += rows + } + return int32(length), nil } func (writer *baseBinlogWriter) GetBinlogType() BinlogType { @@ -90,22 +65,19 @@ func (writer *baseBinlogWriter) GetBinlogType() BinlogType { } // GetBuffer get binlog buffer. Return nil if binlog is not finished yet. -func (writer *baseBinlogWriter) GetBuffer() []byte { - if writer.buffer != nil { - return writer.buffer.Bytes() +func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) { + if writer.buffer == nil { + return nil, errors.New("please close binlog before get buffer") } - return nil + return writer.buffer.Bytes(), nil } // Close allocate buffer and release resource func (writer *baseBinlogWriter) Close() error { - if writer.isClose { + if writer.buffer != nil { return nil } - writer.isClose = true - if err := writer.appendEventWriter(); err != nil { - return err - } + var offset int32 writer.buffer = new(bytes.Buffer) if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil { return err @@ -113,15 +85,27 @@ func (writer *baseBinlogWriter) Close() error { if err := writer.descriptorEvent.Write(writer.buffer); err != nil { return err } + offset = writer.descriptorEvent.GetMemoryUsageInBytes() + writer.length = 0 for _, w := range writer.eventWriters { + w.SetOffset(offset) + if err := w.Finish(); err != nil { + return err + } if err := w.Write(writer.buffer); err != nil { return err } - } - - // close all writers - for _, e := range writer.eventWriters { - if err := e.Close(); err != nil { + length, err := w.GetMemoryUsageInBytes() + if err != nil { + return err + } + offset += length + rows, err := w.GetPayloadLengthFromWriter() + if err != nil { + return err + } + writer.length += int32(rows) + if err := w.ReleasePayloadWriter(); err != nil { return err } } @@ -133,19 +117,14 @@ type InsertBinlogWriter struct { } func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newInsertEventWriter(writer.PayloadDataType, writer.offset) + event, err := newInsertEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } @@ -154,19 +133,14 @@ type DeleteBinlogWriter struct { } func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDeleteEventWriter(writer.PayloadDataType, writer.offset) + event, err := newDeleteEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } @@ -175,70 +149,50 @@ type DDLBinlogWriter struct { } func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newCreateCollectionEventWriter(writer.PayloadDataType, writer.offset) + event, err := newCreateCollectionEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDropCollectionEventWriter(writer.PayloadDataType, writer.offset) + event, err := newDropCollectionEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newCreatePartitionEventWriter(writer.PayloadDataType, writer.offset) + event, err := newCreatePartitionEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) { - if err := writer.checkClose(); err != nil { - return nil, err + if writer.isClosed() { + return nil, errors.New("binlog has closed") } - if err := writer.appendEventWriter(); err != nil { - return nil, err - } - - event, err := newDropPartitionEventWriter(writer.PayloadDataType, writer.offset) + event, err := newDropPartitionEventWriter(writer.PayloadDataType) if err != nil { return nil, err } - writer.currentEventWriter = event - + writer.eventWriters = append(writer.eventWriters, event) return event, nil } @@ -250,16 +204,11 @@ func NewInsertBinlogWriter(dataType schemapb.DataType) (*InsertBinlogWriter, err descriptorEvent.PayloadDataType = dataType return &InsertBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: *descriptorEvent, - magicNumber: MagicNumber, - binlogType: InsertBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), + descriptorEvent: *descriptorEvent, + magicNumber: MagicNumber, + binlogType: InsertBinlog, + eventWriters: make([]EventWriter, 0), + buffer: nil, }, }, nil } @@ -271,16 +220,11 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, err descriptorEvent.PayloadDataType = dataType return &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: *descriptorEvent, - magicNumber: MagicNumber, - binlogType: DeleteBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), + descriptorEvent: *descriptorEvent, + magicNumber: MagicNumber, + binlogType: DeleteBinlog, + eventWriters: make([]EventWriter, 0), + buffer: nil, }, }, nil } @@ -292,16 +236,11 @@ func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) { descriptorEvent.PayloadDataType = dataType return &DDLBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: *descriptorEvent, - magicNumber: MagicNumber, - binlogType: DDLBinlog, - eventWriters: make([]EventWriter, 0), - currentEventWriter: nil, - buffer: nil, - numEvents: 0, - numRows: 0, - isClose: false, - offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), + descriptorEvent: *descriptorEvent, + magicNumber: MagicNumber, + binlogType: DDLBinlog, + eventWriters: make([]EventWriter, 0), + buffer: nil, }, }, nil } diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index 820854e334..f2c9729b64 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -17,7 +17,8 @@ func TestBinlogWriterReader(t *testing.T) { assert.Nil(t, err) err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) assert.Nil(t, err) - assert.Nil(t, nil, binlogWriter.GetBuffer()) + _, err = binlogWriter.GetBuffer() + assert.NotNil(t, err) err = binlogWriter.Close() assert.Nil(t, err) assert.EqualValues(t, 1, binlogWriter.GetEventNums()) @@ -30,7 +31,8 @@ func TestBinlogWriterReader(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, 3, nums) - buffer := binlogWriter.GetBuffer() + buffer, err := binlogWriter.GetBuffer() + assert.Nil(t, err) fmt.Println("reader offset : " + strconv.Itoa(len(buffer))) binlogReader, err := NewBinlogReader(buffer) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index ef1ee58d44..42c0f58b61 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -274,7 +274,10 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm return nil, err } - buffer := writer.GetBuffer() + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } blobKey := fmt.Sprintf("%d/insert_log/%d/%d/%d/%d/%d", insertCodec.TenantID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx) blobs = append(blobs, &Blob{ @@ -566,7 +569,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times if err != nil { return nil, err } - buffer := writer.GetBuffer() + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } blobKey := fmt.Sprintf("%d/data_definition_log/%d/%d/%d", dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, RequestField, logIdx) blobs = append(blobs, &Blob{ @@ -595,7 +601,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Times if err != nil { return nil, err } - buffer = writer.GetBuffer() + buffer, err = writer.GetBuffer() + if err != nil { + return nil, err + } blobKey = fmt.Sprintf("%d/data_definition_log/%d/%d/%d", dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, TsField, logIdx) blobs = append(blobs, &Blob{ diff --git a/internal/storage/event_test.go b/internal/storage/event_test.go index 9bb0dfa229..60dcc5d603 100644 --- a/internal/storage/event_test.go +++ b/internal/storage/event_test.go @@ -152,7 +152,7 @@ func TestInsertEvent(t *testing.T) { iw func(w *insertEventWriter) error, ev interface{}, ) { - w, err := newInsertEventWriter(dt, 0) + w, err := newInsertEventWriter(dt) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -325,7 +325,7 @@ func TestInsertEvent(t *testing.T) { }) t.Run("insert_string", func(t *testing.T) { - w, err := newInsertEventWriter(schemapb.DataType_STRING, 0) + w, err := newInsertEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -401,7 +401,7 @@ func TestDeleteEvent(t *testing.T) { iw func(w *deleteEventWriter) error, ev interface{}, ) { - w, err := newDeleteEventWriter(dt, 0) + w, err := newDeleteEventWriter(dt) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -574,7 +574,7 @@ func TestDeleteEvent(t *testing.T) { }) t.Run("delete_string", func(t *testing.T) { - w, err := newDeleteEventWriter(schemapb.DataType_STRING, 0) + w, err := newDeleteEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -644,13 +644,13 @@ func TestDeleteEvent(t *testing.T) { func TestCreateCollectionEvent(t *testing.T) { t.Run("create_event", func(t *testing.T) { - w, err := newCreateCollectionEventWriter(schemapb.DataType_FLOAT, 0) + w, err := newCreateCollectionEventWriter(schemapb.DataType_FLOAT) assert.NotNil(t, err) assert.Nil(t, w) }) t.Run("create_collection_timestamp", func(t *testing.T) { - w, err := newCreateCollectionEventWriter(schemapb.DataType_INT64, 0) + w, err := newCreateCollectionEventWriter(schemapb.DataType_INT64) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -697,7 +697,7 @@ func TestCreateCollectionEvent(t *testing.T) { }) t.Run("create_collection_string", func(t *testing.T) { - w, err := newCreateCollectionEventWriter(schemapb.DataType_STRING, 0) + w, err := newCreateCollectionEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -767,13 +767,13 @@ func TestCreateCollectionEvent(t *testing.T) { func TestDropCollectionEvent(t *testing.T) { t.Run("drop_event", func(t *testing.T) { - w, err := newDropCollectionEventWriter(schemapb.DataType_FLOAT, 0) + w, err := newDropCollectionEventWriter(schemapb.DataType_FLOAT) assert.NotNil(t, err) assert.Nil(t, w) }) t.Run("drop_collection_timestamp", func(t *testing.T) { - w, err := newDropCollectionEventWriter(schemapb.DataType_INT64, 0) + w, err := newDropCollectionEventWriter(schemapb.DataType_INT64) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -820,7 +820,7 @@ func TestDropCollectionEvent(t *testing.T) { }) t.Run("drop_collection_string", func(t *testing.T) { - w, err := newDropCollectionEventWriter(schemapb.DataType_STRING, 0) + w, err := newDropCollectionEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -890,13 +890,13 @@ func TestDropCollectionEvent(t *testing.T) { func TestCreatePartitionEvent(t *testing.T) { t.Run("create_event", func(t *testing.T) { - w, err := newCreatePartitionEventWriter(schemapb.DataType_FLOAT, 0) + w, err := newCreatePartitionEventWriter(schemapb.DataType_FLOAT) assert.NotNil(t, err) assert.Nil(t, w) }) t.Run("create_partition_timestamp", func(t *testing.T) { - w, err := newCreatePartitionEventWriter(schemapb.DataType_INT64, 0) + w, err := newCreatePartitionEventWriter(schemapb.DataType_INT64) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -943,7 +943,7 @@ func TestCreatePartitionEvent(t *testing.T) { }) t.Run("create_partition_string", func(t *testing.T) { - w, err := newCreatePartitionEventWriter(schemapb.DataType_STRING, 0) + w, err := newCreatePartitionEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -1013,13 +1013,13 @@ func TestCreatePartitionEvent(t *testing.T) { func TestDropPartitionEvent(t *testing.T) { t.Run("drop_event", func(t *testing.T) { - w, err := newDropPartitionEventWriter(schemapb.DataType_FLOAT, 0) + w, err := newDropPartitionEventWriter(schemapb.DataType_FLOAT) assert.NotNil(t, err) assert.Nil(t, w) }) t.Run("drop_partition_timestamp", func(t *testing.T) { - w, err := newDropPartitionEventWriter(schemapb.DataType_INT64, 0) + w, err := newDropPartitionEventWriter(schemapb.DataType_INT64) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) @@ -1066,7 +1066,7 @@ func TestDropPartitionEvent(t *testing.T) { }) t.Run("drop_partition_string", func(t *testing.T) { - w, err := newDropPartitionEventWriter(schemapb.DataType_STRING, 0) + w, err := newDropPartitionEventWriter(schemapb.DataType_STRING) assert.Nil(t, err) w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index f39bf9effa..d28c84c4bf 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -74,6 +74,7 @@ type EventWriter interface { // Write serialize to buffer, should call Finish first Write(buffer *bytes.Buffer) error GetMemoryUsageInBytes() (int32, error) + SetOffset(offset int32) } type baseEventWriter struct { @@ -141,6 +142,10 @@ func (writer *baseEventWriter) Close() error { return nil } +func (writer *baseEventWriter) SetOffset(offset int32) { + writer.offset = offset +} + type insertEventWriter struct { baseEventWriter insertEventData @@ -189,7 +194,7 @@ func newDescriptorEvent() (*descriptorEvent, error) { }, err } -func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) { +func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error) { payloadWriter, err := NewPayloadWriter(dataType) if err != nil { return nil, err @@ -209,7 +214,6 @@ func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEven PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, insertEventData: *data, } @@ -218,7 +222,7 @@ func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEven return writer, nil } -func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEventWriter, error) { +func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error) { payloadWriter, err := NewPayloadWriter(dataType) if err != nil { return nil, err @@ -237,7 +241,6 @@ func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEven PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, deleteEventData: *data, } @@ -245,7 +248,7 @@ func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEven writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData return writer, nil } -func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*createCollectionEventWriter, error) { +func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollectionEventWriter, error) { if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 { return nil, errors.New("incorrect data type") } @@ -269,7 +272,6 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (* PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, createCollectionEventData: *data, } @@ -277,7 +279,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (* writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData return writer, nil } -func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dropCollectionEventWriter, error) { +func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEventWriter, error) { if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 { return nil, errors.New("incorrect data type") } @@ -300,7 +302,6 @@ func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dr PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, dropCollectionEventData: *data, } @@ -308,7 +309,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dr writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData return writer, nil } -func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*createPartitionEventWriter, error) { +func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartitionEventWriter, error) { if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 { return nil, errors.New("incorrect data type") } @@ -332,7 +333,6 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*c PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, createPartitionEventData: *data, } @@ -340,7 +340,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*c writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData return writer, nil } -func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dropPartitionEventWriter, error) { +func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEventWriter, error) { if dataType != schemapb.DataType_STRING && dataType != schemapb.DataType_INT64 { return nil, errors.New("incorrect data type") } @@ -363,7 +363,6 @@ func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dro PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, - offset: offset, }, dropPartitionEventData: *data, } diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go index 7f874591e8..ceaf696df8 100644 --- a/internal/storage/event_writer_test.go +++ b/internal/storage/event_writer_test.go @@ -31,12 +31,12 @@ func TestSizeofStruct(t *testing.T) { } func TestEventWriter(t *testing.T) { - insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0) + insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32) assert.Nil(t, err) err = insertEvent.Close() assert.Nil(t, err) - insertEvent, err = newInsertEventWriter(schemapb.DataType_INT32, 0) + insertEvent, err = newInsertEventWriter(schemapb.DataType_INT32) assert.Nil(t, err) defer insertEvent.Close()