mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add binlog event
Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
parent
968ddc6016
commit
de12fa5a10
@ -243,12 +243,15 @@ func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEve
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter {
|
||||
descriptorEvent := newDescriptorEvent()
|
||||
func NewInsertBinlogWriter(dataType schemapb.DataType) (*InsertBinlogWriter, error) {
|
||||
descriptorEvent, err := newDescriptorEvent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
return &InsertBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: descriptorEvent,
|
||||
descriptorEvent: *descriptorEvent,
|
||||
magicNumber: MagicNumber,
|
||||
binlogType: InsertBinlog,
|
||||
eventWriters: make([]EventWriter, 0),
|
||||
@ -259,14 +262,17 @@ func NewInsertBinlogWriter(dataType schemapb.DataType) *InsertBinlogWriter {
|
||||
isClose: false,
|
||||
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter {
|
||||
descriptorEvent := newDescriptorEvent()
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) {
|
||||
descriptorEvent, err := newDescriptorEvent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
return &DeleteBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: descriptorEvent,
|
||||
descriptorEvent: *descriptorEvent,
|
||||
magicNumber: MagicNumber,
|
||||
binlogType: DeleteBinlog,
|
||||
eventWriters: make([]EventWriter, 0),
|
||||
@ -277,14 +283,17 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType) *DeleteBinlogWriter {
|
||||
isClose: false,
|
||||
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter {
|
||||
descriptorEvent := newDescriptorEvent()
|
||||
func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) {
|
||||
descriptorEvent, err := newDescriptorEvent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
return &DDLBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: descriptorEvent,
|
||||
descriptorEvent: *descriptorEvent,
|
||||
magicNumber: MagicNumber,
|
||||
binlogType: DDLBinlog,
|
||||
eventWriters: make([]EventWriter, 0),
|
||||
@ -295,5 +304,5 @@ func NewDDLBinlogWriter(dataType schemapb.DataType) *DDLBinlogWriter {
|
||||
isClose: false,
|
||||
offset: 4 + descriptorEvent.GetMemoryUsageInBytes(),
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -10,8 +10,9 @@ import (
|
||||
)
|
||||
|
||||
func TestBinlogWriterReader(t *testing.T) {
|
||||
binlogWriter := NewInsertBinlogWriter(schemapb.DataType_INT32)
|
||||
binlogWriter, err := NewInsertBinlogWriter(schemapb.DataType_INT32)
|
||||
defer binlogWriter.Close()
|
||||
assert.Nil(t, err)
|
||||
eventWriter, err := binlogWriter.NextInsertEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3})
|
||||
|
||||
@ -37,8 +37,6 @@ macro( build_arrow )
|
||||
"-DARROW_BUILD_UTILITIES=OFF"
|
||||
"-DARROW_PARQUET=ON"
|
||||
"-DPARQUET_BUILD_SHARED=OFF"
|
||||
"-DThrift_SOURCE=BUNDLED"
|
||||
"-Dutf8proc_SOURCE=BUNDLED"
|
||||
"-DARROW_S3=OFF"
|
||||
"-DCMAKE_VERBOSE_MAKEFILE=ON"
|
||||
"-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}"
|
||||
|
||||
@ -1,12 +1,11 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
@ -37,9 +36,7 @@ func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) {
|
||||
}
|
||||
|
||||
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = data.Write(buf)
|
||||
return int32(buf.Len())
|
||||
return int32(binary.Size(data.DescriptorEventDataFixPart) + binary.Size(data.PostHeaderLengths))
|
||||
}
|
||||
|
||||
func (data *descriptorEventData) Write(buffer io.Writer) error {
|
||||
@ -54,7 +51,10 @@ func (data *descriptorEventData) Write(buffer io.Writer) error {
|
||||
}
|
||||
|
||||
func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
|
||||
event := newDescriptorEventData()
|
||||
event, err := newDescriptorEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil {
|
||||
return nil, err
|
||||
@ -64,11 +64,11 @@ func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &event, nil
|
||||
return event, nil
|
||||
}
|
||||
|
||||
type eventData interface {
|
||||
GetEventDataSize() int32
|
||||
GetEventDataFixPartSize() int32
|
||||
WriteEventData(buffer io.Writer) error
|
||||
}
|
||||
|
||||
@ -87,10 +87,8 @@ func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *insertEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *insertEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *insertEventData) WriteEventData(buffer io.Writer) error {
|
||||
@ -110,20 +108,12 @@ func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *deleteEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *deleteEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *deleteEventData) WriteEventData(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return binary.Write(buffer, binary.LittleEndian, data)
|
||||
}
|
||||
|
||||
type createCollectionEventData struct {
|
||||
@ -139,20 +129,12 @@ func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timest
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *createCollectionEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *createCollectionEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return binary.Write(buffer, binary.LittleEndian, data)
|
||||
}
|
||||
|
||||
type dropCollectionEventData struct {
|
||||
@ -168,20 +150,12 @@ func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestam
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *dropCollectionEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return binary.Write(buffer, binary.LittleEndian, data)
|
||||
}
|
||||
|
||||
type createPartitionEventData struct {
|
||||
@ -197,20 +171,12 @@ func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timesta
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *createPartitionEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *createPartitionEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return binary.Write(buffer, binary.LittleEndian, data)
|
||||
}
|
||||
|
||||
type dropPartitionEventData struct {
|
||||
@ -226,23 +192,36 @@ func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp
|
||||
data.EndTimestamp = timestamp
|
||||
}
|
||||
|
||||
func (data *dropPartitionEventData) GetEventDataSize() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
_ = binary.Write(buf, binary.LittleEndian, data)
|
||||
return int32(buf.Len())
|
||||
func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data))
|
||||
}
|
||||
|
||||
func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.StartTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.EndTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return binary.Write(buffer, binary.LittleEndian, data)
|
||||
}
|
||||
|
||||
func newDescriptorEventData() descriptorEventData {
|
||||
func getEventFixPartSize(code EventTypeCode) int32 {
|
||||
switch code {
|
||||
case DescriptorEventType:
|
||||
return int32(binary.Size(descriptorEventData{}.DescriptorEventDataFixPart))
|
||||
case InsertEventType:
|
||||
return (&insertEventData{}).GetEventDataFixPartSize()
|
||||
case DeleteEventType:
|
||||
return (&deleteEventData{}).GetEventDataFixPartSize()
|
||||
case CreateCollectionEventType:
|
||||
return (&createCollectionEventData{}).GetEventDataFixPartSize()
|
||||
case DropCollectionEventType:
|
||||
return (&dropCollectionEventData{}).GetEventDataFixPartSize()
|
||||
case CreatePartitionEventType:
|
||||
return (&createCollectionEventData{}).GetEventDataFixPartSize()
|
||||
case DropPartitionEventType:
|
||||
return (&dropPartitionEventData{}).GetEventDataFixPartSize()
|
||||
default:
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
func newDescriptorEventData() (*descriptorEventData, error) {
|
||||
data := descriptorEventData{
|
||||
DescriptorEventDataFixPart: DescriptorEventDataFixPart{
|
||||
BinlogVersion: BinlogVersion,
|
||||
@ -255,50 +234,57 @@ func newDescriptorEventData() descriptorEventData {
|
||||
EndTimestamp: 0,
|
||||
PayloadDataType: -1,
|
||||
},
|
||||
PostHeaderLengths: []uint8{16, 16, 16, 16, 16, 16},
|
||||
PostHeaderLengths: []uint8{},
|
||||
}
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
if size == -1 {
|
||||
return nil, errors.Errorf("undefined event type %d", i)
|
||||
}
|
||||
data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size))
|
||||
}
|
||||
data.HeaderLength = int8(data.GetMemoryUsageInBytes())
|
||||
return data
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
func newInsertEventData() insertEventData {
|
||||
return insertEventData{
|
||||
func newInsertEventData() (*insertEventData, error) {
|
||||
return &insertEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func newDeleteEventData() deleteEventData {
|
||||
return deleteEventData{
|
||||
func newDeleteEventData() (*deleteEventData, error) {
|
||||
return &deleteEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func newCreateCollectionEventData() createCollectionEventData {
|
||||
return createCollectionEventData{
|
||||
func newCreateCollectionEventData() (*createCollectionEventData, error) {
|
||||
return &createCollectionEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func newDropCollectionEventData() dropCollectionEventData {
|
||||
return dropCollectionEventData{
|
||||
func newDropCollectionEventData() (*dropCollectionEventData, error) {
|
||||
return &dropCollectionEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func newCreatePartitionEventData() createPartitionEventData {
|
||||
return createPartitionEventData{
|
||||
func newCreatePartitionEventData() (*createPartitionEventData, error) {
|
||||
return &createPartitionEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
func newDropPartitionEventData() dropPartitionEventData {
|
||||
return dropPartitionEventData{
|
||||
func newDropPartitionEventData() (*dropPartitionEventData, error) {
|
||||
return &dropPartitionEventData{
|
||||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func readInsertEventData(buffer io.Reader) (*insertEventData, error) {
|
||||
func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) {
|
||||
data := &insertEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
@ -306,7 +292,7 @@ func readInsertEventData(buffer io.Reader) (*insertEventData, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) {
|
||||
func readDeleteEventDataFixPart(buffer io.Reader) (*deleteEventData, error) {
|
||||
data := &deleteEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
@ -314,7 +300,7 @@ func readDeleteEventData(buffer io.Reader) (*deleteEventData, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData, error) {
|
||||
func readCreateCollectionEventDataFixPart(buffer io.Reader) (*createCollectionEventData, error) {
|
||||
data := &createCollectionEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
@ -322,7 +308,7 @@ func readCreateCollectionEventData(buffer io.Reader) (*createCollectionEventData
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, error) {
|
||||
func readDropCollectionEventDataFixPart(buffer io.Reader) (*dropCollectionEventData, error) {
|
||||
data := &dropCollectionEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
@ -330,7 +316,7 @@ func readDropCollectionEventData(buffer io.Reader) (*dropCollectionEventData, er
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData, error) {
|
||||
func readCreatePartitionEventDataFixPart(buffer io.Reader) (*createPartitionEventData, error) {
|
||||
data := &createPartitionEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
@ -338,7 +324,7 @@ func readCreatePartitionEventData(buffer io.Reader) (*createPartitionEventData,
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func readDropPartitionEventData(buffer io.Reader) (*dropPartitionEventData, error) {
|
||||
func readDropPartitionEventDataFixPart(buffer io.Reader) (*dropPartitionEventData, error) {
|
||||
data := &dropPartitionEventData{}
|
||||
if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -1,13 +1,11 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
@ -20,9 +18,7 @@ type baseEventHeader struct {
|
||||
}
|
||||
|
||||
func (header *baseEventHeader) GetMemoryUsageInBytes() int32 {
|
||||
buf := new(bytes.Buffer)
|
||||
binary.Write(buf, binary.LittleEndian, header)
|
||||
return int32(buf.Len())
|
||||
return int32(binary.Size(header))
|
||||
}
|
||||
|
||||
func (header *baseEventHeader) Write(buffer io.Writer) error {
|
||||
@ -52,7 +48,7 @@ func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error)
|
||||
return header, nil
|
||||
}
|
||||
|
||||
func newDescriptorEventHeader() descriptorEventHeader {
|
||||
func newDescriptorEventHeader() (*descriptorEventHeader, error) {
|
||||
header := descriptorEventHeader{
|
||||
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||
TypeCode: DescriptorEventType,
|
||||
@ -60,11 +56,11 @@ func newDescriptorEventHeader() descriptorEventHeader {
|
||||
}
|
||||
header.EventLength = header.GetMemoryUsageInBytes()
|
||||
header.NextPosition = header.EventLength + 4
|
||||
return header
|
||||
return &header, nil
|
||||
}
|
||||
|
||||
func newEventHeader(eventTypeCode EventTypeCode) eventHeader {
|
||||
return eventHeader{
|
||||
func newEventHeader(eventTypeCode EventTypeCode) (*eventHeader, error) {
|
||||
return &eventHeader{
|
||||
baseEventHeader: baseEventHeader{
|
||||
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||
TypeCode: eventTypeCode,
|
||||
@ -72,5 +68,5 @@ func newEventHeader(eventTypeCode EventTypeCode) eventHeader {
|
||||
EventLength: -1,
|
||||
NextPosition: -1,
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -43,17 +43,17 @@ func (reader *EventReader) readData() (eventData, error) {
|
||||
var err error
|
||||
switch reader.TypeCode {
|
||||
case InsertEventType:
|
||||
data, err = readInsertEventData(reader.buffer)
|
||||
data, err = readInsertEventDataFixPart(reader.buffer)
|
||||
case DeleteEventType:
|
||||
data, err = readDeleteEventData(reader.buffer)
|
||||
data, err = readDeleteEventDataFixPart(reader.buffer)
|
||||
case CreateCollectionEventType:
|
||||
data, err = readCreateCollectionEventData(reader.buffer)
|
||||
data, err = readCreateCollectionEventDataFixPart(reader.buffer)
|
||||
case DropCollectionEventType:
|
||||
data, err = readDropCollectionEventData(reader.buffer)
|
||||
data, err = readDropCollectionEventDataFixPart(reader.buffer)
|
||||
case CreatePartitionEventType:
|
||||
data, err = readCreatePartitionEventData(reader.buffer)
|
||||
data, err = readCreatePartitionEventDataFixPart(reader.buffer)
|
||||
case DropPartitionEventType:
|
||||
data, err = readDropPartitionEventData(reader.buffer)
|
||||
data, err = readDropPartitionEventDataFixPart(reader.buffer)
|
||||
default:
|
||||
return nil, errors.New("unknown header type code: " + strconv.Itoa(int(reader.TypeCode)))
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ const (
|
||||
DropCollectionEventType
|
||||
CreatePartitionEventType
|
||||
DropPartitionEventType
|
||||
EventTypeEnd
|
||||
)
|
||||
|
||||
func (code EventTypeCode) String() string {
|
||||
@ -169,11 +170,19 @@ type dropPartitionEventWriter struct {
|
||||
dropPartitionEventData
|
||||
}
|
||||
|
||||
func newDescriptorEvent() descriptorEvent {
|
||||
return descriptorEvent{
|
||||
descriptorEventHeader: newDescriptorEventHeader(),
|
||||
descriptorEventData: newDescriptorEventData(),
|
||||
func newDescriptorEvent() (*descriptorEvent, error) {
|
||||
header, err := newDescriptorEventHeader()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newDescriptorEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &descriptorEvent{
|
||||
descriptorEventHeader: *header,
|
||||
descriptorEventData: *data,
|
||||
}, err
|
||||
}
|
||||
|
||||
func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEventWriter, error) {
|
||||
@ -181,17 +190,26 @@ func newInsertEventWriter(dataType schemapb.DataType, offset int32) (*insertEven
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(InsertEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newInsertEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writer := &insertEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(InsertEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
insertEventData: newInsertEventData(),
|
||||
insertEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
@ -201,17 +219,25 @@ func newDeleteEventWriter(dataType schemapb.DataType, offset int32) (*deleteEven
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(DeleteEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newDeleteEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := &deleteEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(DeleteEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
deleteEventData: newDeleteEventData(),
|
||||
deleteEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
@ -220,17 +246,26 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType, offset int32) (*
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(CreateCollectionEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newCreateCollectionEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writer := &createCollectionEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(CreateCollectionEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
createCollectionEventData: newCreateCollectionEventData(),
|
||||
createCollectionEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
@ -239,17 +274,25 @@ func newDropCollectionEventWriter(dataType schemapb.DataType, offset int32) (*dr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(DropCollectionEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newDropCollectionEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := &dropCollectionEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(DropCollectionEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
dropCollectionEventData: newDropCollectionEventData(),
|
||||
dropCollectionEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
@ -258,17 +301,26 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType, offset int32) (*c
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(CreatePartitionEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newCreatePartitionEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writer := &createPartitionEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(CreatePartitionEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
createPartitionEventData: newCreatePartitionEventData(),
|
||||
createPartitionEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
@ -277,17 +329,25 @@ func newDropPartitionEventWriter(dataType schemapb.DataType, offset int32) (*dro
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
header, err := newEventHeader(DropPartitionEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := newDropPartitionEventData()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := &dropPartitionEventWriter{
|
||||
baseEventWriter: baseEventWriter{
|
||||
eventHeader: newEventHeader(DropPartitionEventType),
|
||||
eventHeader: *header,
|
||||
PayloadWriterInterface: payloadWriter,
|
||||
isClosed: false,
|
||||
isFinish: false,
|
||||
offset: offset,
|
||||
},
|
||||
dropPartitionEventData: newDropPartitionEventData(),
|
||||
dropPartitionEventData: *data,
|
||||
}
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataSize
|
||||
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize
|
||||
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
|
||||
return writer, nil
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
@ -9,6 +10,27 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestSizeofStruct(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
err := binary.Write(&buf, binary.LittleEndian, baseEventHeader{})
|
||||
assert.Nil(t, err)
|
||||
s1 := binary.Size(baseEventHeader{})
|
||||
s2 := binary.Size(&baseEventHeader{})
|
||||
assert.Equal(t, s1, s2)
|
||||
assert.Equal(t, s1, buf.Len())
|
||||
buf.Reset()
|
||||
assert.Equal(t, 0, buf.Len())
|
||||
|
||||
de := descriptorEventData{
|
||||
DescriptorEventDataFixPart: DescriptorEventDataFixPart{},
|
||||
PostHeaderLengths: []uint8{0, 1, 2, 3},
|
||||
}
|
||||
err = de.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
s3 := binary.Size(de.DescriptorEventDataFixPart) + binary.Size(de.PostHeaderLengths)
|
||||
assert.Equal(t, s3, buf.Len())
|
||||
}
|
||||
|
||||
func TestEventWriter(t *testing.T) {
|
||||
insertEvent, err := newInsertEventWriter(schemapb.DataType_INT32, 0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user