diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 313d074e54..055d6cf995 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -243,12 +243,15 @@ func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEve return event, nil } -func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter { - descriptorEvent := newDescriptorEvent() +func NewInsertBinlogWriter(dataType schemapb.DataType) (*InsertBinlogWriter, error) { + descriptorEvent, err := newDescriptorEvent() + if err != nil { + return nil, err + } descriptorEvent.PayloadDataType = dataType return &InsertBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, + descriptorEvent: *descriptorEvent, magicNumber: MagicNumber, binlogType: InsertBinlog, eventWriters: make([]EventWriter, 0), @@ -259,14 +262,17 @@ func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter { isClose: false, offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), }, - } + }, nil } -func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter { - descriptorEvent := newDescriptorEvent() +func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) { + descriptorEvent, err := newDescriptorEvent() + if err != nil { + return nil, err + } descriptorEvent.PayloadDataType = dataType return &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, + descriptorEvent: *descriptorEvent, magicNumber: MagicNumber, binlogType: DeleteBinlog, eventWriters: make([]EventWriter, 0), @@ -277,14 +283,17 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter { isClose: false, offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), }, - } + }, nil } -func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter { - descriptorEvent := newDescriptorEvent() +func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) { + descriptorEvent, err := newDescriptorEvent() + if err != nil { + return nil, err + } descriptorEvent.PayloadDataType = dataType return &DDLBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: descriptorEvent, + descriptorEvent: *descriptorEvent, magicNumber: MagicNumber, binlogType: DDLBinlog, eventWriters: make([]EventWriter, 0), @@ -295,5 +304,5 @@ func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter { isClose: false, offset: 4 + descriptorEvent.GetMemoryUsageInBytes(), }, - } + }, nil } diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index c7172e64ff..820854e334 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -10,8 +10,9 @@ import ( ) func TestBinlogWriterReader(t *testing.T) { - binlogWriter := NewInsertBinlogWriter(schemapb.DataType_INT32) + binlogWriter, err := NewInsertBinlogWriter(schemapb.DataType_INT32) defer binlogWriter.Close() + assert.Nil(t, err) eventWriter, err := binlogWriter.NextInsertEventWriter() assert.Nil(t, err) err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) diff --git a/internal/storage/cwrapper/CMakeLists.txt b/internal/storage/cwrapper/CMakeLists.txt index 6806f8bb7b..934e805405 100644 --- a/internal/storage/cwrapper/CMakeLists.txt +++ b/internal/storage/cwrapper/CMakeLists.txt @@ -37,8 +37,6 @@ macro( build_arrow ) "-DARROW_BUILD_UTILITIES=OFF" "-DARROW_PARQUET=ON" "-DPARQUET_BUILD_SHARED=OFF" - "-DThrift_SOURCE=BUNDLED" - "-Dutf8proc_SOURCE=BUNDLED" "-DARROW_S3=OFF" "-DCMAKE_VERBOSE_MAKEFILE=ON" "-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}" diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index 7a15e4a87f..d107311118 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -1,12 +1,11 @@ package storage import ( - "bytes" "encoding/binary" "io" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -37,9 +36,7 @@ func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) { } func (data *descriptorEventData) GetMemoryUsageInBytes() int32 { - buf := new(bytes.Buffer) - _ = data.Write(buf) - return int32(buf.Len()) + return int32(binary.Size(data.DescriptorEventDataFixPart) + binary.Size(data.PostHeaderLengths)) } func (data *descriptorEventData) Write(buffer io.Writer) error { @@ -54,7 +51,10 @@ func (data *descriptorEventData) Write(buffer io.Writer) error { } func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { - event := newDescriptorEventData() + event, err := newDescriptorEventData() + if err != nil { + return nil, err + } if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil { return nil, err @@ -64,11 +64,11 @@ func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { return nil, err } - return &event, nil + return event, nil } type eventData interface { - GetEventDataSize() int32 + GetEventDataFixPartSize() int32 WriteEventData(buffer io.Writer) error } @@ -87,10 +87,8 @@ func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { data.EndTimestamp = timestamp } -func (data *insertEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *insertEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *insertEventData) WriteEventData(buffer io.Writer) error { @@ -110,20 +108,12 @@ func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { data.EndTimestamp = timestamp } -func (data *deleteEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *deleteEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *deleteEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil { - return err - } - return nil + return binary.Write(buffer, binary.LittleEndian, data) } type createCollectionEventData struct { @@ -139,20 +129,12 @@ func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timest data.EndTimestamp = timestamp } -func (data *createCollectionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *createCollectionEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil { - return err - } - return nil + return binary.Write(buffer, binary.LittleEndian, data) } type dropCollectionEventData struct { @@ -168,20 +150,12 @@ func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestam data.EndTimestamp = timestamp } -func (data *dropCollectionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil { - return err - } - return nil + return binary.Write(buffer, binary.LittleEndian, data) } type createPartitionEventData struct { @@ -197,20 +171,12 @@ func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timesta data.EndTimestamp = timestamp } -func (data *createPartitionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *createPartitionEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil { - return err - } - return nil + return binary.Write(buffer, binary.LittleEndian, data) } type dropPartitionEventData struct { @@ -226,23 +192,36 @@ func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp data.EndTimestamp = timestamp } -func (data *dropPartitionEventData) GetEventDataSize() int32 { - buf := new(bytes.Buffer) - _ = binary.Write(buf, binary.LittleEndian, data) - return int32(buf.Len()) +func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data)) } func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error { - if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil { - return err - } - if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil { - return err - } - return nil + return binary.Write(buffer, binary.LittleEndian, data) } -func newDescriptorEventData() descriptorEventData { +func getEventFixPartSize(code EventTypeCode) int32 { + switch code { + case DescriptorEventType: + return int32(binary.Size(descriptorEventData{}.DescriptorEventDataFixPart)) + case InsertEventType: + return (&insertEventData{}).GetEventDataFixPartSize() + case DeleteEventType: + return (&deleteEventData{}).GetEventDataFixPartSize() + case CreateCollectionEventType: + return (&createCollectionEventData{}).GetEventDataFixPartSize() + case DropCollectionEventType: + return (&dropCollectionEventData{}).GetEventDataFixPartSize() + case CreatePartitionEventType: + return (&createCollectionEventData{}).GetEventDataFixPartSize() + case DropPartitionEventType: + return (&dropPartitionEventData{}).GetEventDataFixPartSize() + default: + return -1 + } +} + +func newDescriptorEventData() (*descriptorEventData, error) { data := descriptorEventData{ DescriptorEventDataFixPart: DescriptorEventDataFixPart{ BinlogVersion: BinlogVersion, @@ -255,50 +234,57 @@ func newDescriptorEventData() descriptorEventData { EndTimestamp: 0, PayloadDataType: -1, }, - PostHeaderLengths: []uint8{16, 16, 16, 16, 16, 16}, + PostHeaderLengths: []uint8{}, + } + for i := DescriptorEventType; i < EventTypeEnd; i++ { + size := getEventFixPartSize(i) + if size == -1 { + return nil, errors.Errorf("undefined event type %d", i) + } + data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size)) } data.HeaderLength = int8(data.GetMemoryUsageInBytes()) - return data + return &data, nil } -func newInsertEventData() insertEventData { - return insertEventData{ +func newInsertEventData() (*insertEventData, error) { + return &insertEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func newDeleteEventData() deleteEventData { - return deleteEventData{ +func newDeleteEventData() (*deleteEventData, error) { + return &deleteEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func newCreateCollectionEventData() createCollectionEventData { - return createCollectionEventData{ +func newCreateCollectionEventData() (*createCollectionEventData, error) { + return &createCollectionEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func newDropCollectionEventData() dropCollectionEventData { - return dropCollectionEventData{ +func newDropCollectionEventData() (*dropCollectionEventData, error) { + return &dropCollectionEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func newCreatePartitionEventData() createPartitionEventData { - return createPartitionEventData{ +func newCreatePartitionEventData() (*createPartitionEventData, error) { + return &createPartitionEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func newDropPartitionEventData() dropPartitionEventData { - return dropPartitionEventData{ +func newDropPartitionEventData() (*dropPartitionEventData, error) { + return &dropPartitionEventData{ StartTimestamp: 0, EndTimestamp: 0, - } + }, nil } -func readInsertEventData(buffer io.Reader) (*insertEventData, error) { +func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) { data := &insertEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err @@ -306,7 +292,7 @@ func readInsertEventData(buffer io.Reader) (*insertEventData, error) { return data, nil } -func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) { +func readDeleteEventDataFixPart(buffer io.Reader) (*deleteEventData, error) { data := &deleteEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err @@ -314,7 +300,7 @@ func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) { return data, nil } -func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData, error) { +func readCreateCollectionEventDataFixPart(buffer io.Reader) (*createCollectionEventData, error) { data := &createCollectionEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err @@ -322,7 +308,7 @@ func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData return data, nil } -func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, error) { +func readDropCollectionEventDataFixPart(buffer io.Reader) (*dropCollectionEventData, error) { data := &dropCollectionEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err @@ -330,7 +316,7 @@ func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, er return data, nil } -func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, error) { +func readCreatePartitionEventDataFixPart(buffer io.Reader) (*createPartitionEventData, error) { data := &createPartitionEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err @@ -338,7 +324,7 @@ func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, return data, nil } -func readDropPartitionEventData(buffer io.Reader) (*dropPartitionEventData, error) { +func readDropPartitionEventDataFixPart(buffer io.Reader) (*dropPartitionEventData, error) { data := &dropPartitionEventData{} if err := binary.Read(buffer, binary.LittleEndian, data); err != nil { return nil, err diff --git a/internal/storage/event_header.go b/internal/storage/event_header.go index 5532211df7..8d6593c18a 100644 --- a/internal/storage/event_header.go +++ b/internal/storage/event_header.go @@ -1,13 +1,11 @@ package storage import ( - "bytes" "encoding/binary" "io" "time" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -20,9 +18,7 @@ type baseEventHeader struct { } func (header *baseEventHeader) GetMemoryUsageInBytes() int32 { - buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, header) - return int32(buf.Len()) + return int32(binary.Size(header)) } func (header *baseEventHeader) Write(buffer io.Writer) error { @@ -52,7 +48,7 @@ func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error) return header, nil } -func newDescriptorEventHeader() descriptorEventHeader { +func newDescriptorEventHeader() (*descriptorEventHeader, error) { header := descriptorEventHeader{ Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), TypeCode: DescriptorEventType, @@ -60,11 +56,11 @@ func newDescriptorEventHeader() descriptorEventHeader { } header.EventLength = header.GetMemoryUsageInBytes() header.NextPosition = header.EventLength + 4 - return header + return &header, nil } -func newEventHeader(eventTypeCode EventTypeCode) eventHeader { - return eventHeader{ +func newEventHeader(eventTypeCode EventTypeCode) (*eventHeader, error) { + return &eventHeader{ baseEventHeader: baseEventHeader{ Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), TypeCode: eventTypeCode, @@ -72,5 +68,5 @@ func newEventHeader(eventTypeCode EventTypeCode) eventHeader { EventLength: -1, NextPosition: -1, }, - } + }, nil } diff --git a/internal/storage/event_reader.go b/internal/storage/event_reader.go index f31cd2000b..7bc9b93588 100644 --- a/internal/storage/event_reader.go +++ b/internal/storage/event_reader.go @@ -43,17 +43,17 @@ func (reader *EventReader) readData() (eventData, error) { var err error switch reader.TypeCode { case InsertEventType: - data, err = readInsertEventData(reader.buffer) + data, err = readInsertEventDataFixPart(reader.buffer) case DeleteEventType: - data, err = readDeleteEventData(reader.buffer) + data, err = readDeleteEventDataFixPart(reader.buffer) case CreateCollectionEventType: - data, err = readCreateCollectionEventData(reader.buffer) + data, err = readCreateCollectionEventDataFixPart(reader.buffer) case DropCollectionEventType: - data, err = readDropCollectionEventData(reader.buffer) + data, err = readDropCollectionEventDataFixPart(reader.buffer) case CreatePartitionEventType: - data, err = readCreatePartitionEventData(reader.buffer) + data, err = readCreatePartitionEventDataFixPart(reader.buffer) case DropPartitionEventType: - data, err = readDropPartitionEventData(reader.buffer) + data, err = readDropPartitionEventDataFixPart(reader.buffer) default: return nil, errors.New("unknown header type code: " + strconv.Itoa(int(reader.TypeCode))) } diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index cf56d4c248..56523cf3cf 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -18,6 +18,7 @@ const ( DropCollectionEventType CreatePartitionEventType DropPartitionEventType + EventTypeEnd ) func (code EventTypeCode) String() string { @@ -169,11 +170,19 @@ type dropPartitionEventWriter struct { dropPartitionEventData } -func newDescriptorEvent() descriptorEvent { - return descriptorEvent{ - descriptorEventHeader: newDescriptorEventHeader(), - descriptorEventData: newDescriptorEventData(), +func newDescriptorEvent() (*descriptorEvent, error) { + header, err := newDescriptorEventHeader() + if err != nil { + return nil, err } + data, err := newDescriptorEventData() + if err != nil { + return nil, err + } + return &descriptorEvent{ + descriptorEventHeader: *header, + descriptorEventData: *data, + }, err } func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) { @@ -181,17 +190,26 @@ func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEven if err != nil { return nil, err } + header, err := newEventHeader(InsertEventType) + if err != nil { + return nil, err + } + data, err := newInsertEventData() + if err != nil { + return nil, err + } + writer := &insertEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(InsertEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - insertEventData: newInsertEventData(), + insertEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData return writer, nil } @@ -201,17 +219,25 @@ func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEven if err != nil { return nil, err } + header, err := newEventHeader(DeleteEventType) + if err != nil { + return nil, err + } + data, err := newDeleteEventData() + if err != nil { + return nil, err + } writer := &deleteEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DeleteEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - deleteEventData: newDeleteEventData(), + deleteEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData return writer, nil } @@ -220,17 +246,26 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (* if err != nil { return nil, err } + header, err := newEventHeader(CreateCollectionEventType) + if err != nil { + return nil, err + } + data, err := newCreateCollectionEventData() + if err != nil { + return nil, err + } + writer := &createCollectionEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(CreateCollectionEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - createCollectionEventData: newCreateCollectionEventData(), + createCollectionEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData return writer, nil } @@ -239,17 +274,25 @@ func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dr if err != nil { return nil, err } + header, err := newEventHeader(DropCollectionEventType) + if err != nil { + return nil, err + } + data, err := newDropCollectionEventData() + if err != nil { + return nil, err + } writer := &dropCollectionEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DropCollectionEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - dropCollectionEventData: newDropCollectionEventData(), + dropCollectionEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData return writer, nil } @@ -258,17 +301,26 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*c if err != nil { return nil, err } + header, err := newEventHeader(CreatePartitionEventType) + if err != nil { + return nil, err + } + data, err := newCreatePartitionEventData() + if err != nil { + return nil, err + } + writer := &createPartitionEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(CreatePartitionEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - createPartitionEventData: newCreatePartitionEventData(), + createPartitionEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData return writer, nil } @@ -277,17 +329,25 @@ func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dro if err != nil { return nil, err } + header, err := newEventHeader(DropPartitionEventType) + if err != nil { + return nil, err + } + data, err := newDropPartitionEventData() + if err != nil { + return nil, err + } writer := &dropPartitionEventWriter{ baseEventWriter: baseEventWriter{ - eventHeader: newEventHeader(DropPartitionEventType), + eventHeader: *header, PayloadWriterInterface: payloadWriter, isClosed: false, isFinish: false, offset: offset, }, - dropPartitionEventData: newDropPartitionEventData(), + dropPartitionEventData: *data, } - writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataSize + writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData return writer, nil } diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go index 3c4a2beb68..74c1f27af0 100644 --- a/internal/storage/event_writer_test.go +++ b/internal/storage/event_writer_test.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "encoding/binary" "testing" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -9,6 +10,27 @@ import ( "github.com/stretchr/testify/assert" ) +func TestSizeofStruct(t *testing.T) { + var buf bytes.Buffer + err := binary.Write(&buf, binary.LittleEndian, baseEventHeader{}) + assert.Nil(t, err) + s1 := binary.Size(baseEventHeader{}) + s2 := binary.Size(&baseEventHeader{}) + assert.Equal(t, s1, s2) + assert.Equal(t, s1, buf.Len()) + buf.Reset() + assert.Equal(t, 0, buf.Len()) + + de := descriptorEventData{ + DescriptorEventDataFixPart: DescriptorEventDataFixPart{}, + PostHeaderLengths: []uint8{0, 1, 2, 3}, + } + err = de.Write(&buf) + assert.Nil(t, err) + s3 := binary.Size(de.DescriptorEventDataFixPart) + binary.Size(de.PostHeaderLengths) + assert.Equal(t, s3, buf.Len()) +} + func TestEventWriter(t *testing.T) { insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0) assert.Nil(t, err)