mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Add description event extras. (#8264)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
f34e5205f7
commit
10c07a3041
@ -50,6 +50,8 @@ func TestInsertBinlog(t *testing.T) {
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
@ -77,11 +79,6 @@ func TestInsertBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
@ -91,26 +88,6 @@ func TestInsertBinlog(t *testing.T) {
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(10))
|
||||
@ -153,6 +130,19 @@ func TestInsertBinlog(t *testing.T) {
|
||||
pos++
|
||||
}
|
||||
|
||||
//descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
@ -167,11 +157,6 @@ func TestInsertBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e1tc), InsertEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
@ -215,11 +200,6 @@ func TestInsertBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e2tc), InsertEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
@ -307,6 +287,8 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
@ -334,11 +316,6 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
@ -348,26 +325,6 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
@ -410,6 +367,19 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
pos++
|
||||
}
|
||||
|
||||
//descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
@ -424,11 +394,6 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e1tc), DeleteEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
@ -472,11 +437,6 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e2tc), DeleteEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
@ -564,6 +524,8 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
@ -591,11 +553,6 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
@ -605,26 +562,6 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
@ -667,6 +604,19 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
pos++
|
||||
}
|
||||
|
||||
//descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
@ -681,11 +631,6 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
@ -729,11 +674,6 @@ func TestDDLBinlog1(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
@ -821,6 +761,8 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.NotNil(t, err)
|
||||
err = w.Close()
|
||||
@ -848,11 +790,6 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
//descriptor header, server id
|
||||
svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(svrID))
|
||||
|
||||
//descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
@ -862,26 +799,6 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
//descriptor data fix, binlog version
|
||||
binLogVer := UnsafeReadInt16(buf, pos)
|
||||
assert.Equal(t, binLogVer, int16(BinlogVersion))
|
||||
pos += int(unsafe.Sizeof(binLogVer))
|
||||
|
||||
//descriptor data fix, server version
|
||||
svrVer := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, svrVer, int64(ServerVersion))
|
||||
pos += int(unsafe.Sizeof(svrVer))
|
||||
|
||||
//descriptor data fix, commit id
|
||||
cmitID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, cmitID, int64(CommitID))
|
||||
pos += int(unsafe.Sizeof(cmitID))
|
||||
|
||||
//descriptor data fix, header length
|
||||
headLen := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
pos += int(unsafe.Sizeof(headLen))
|
||||
|
||||
//descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
@ -924,6 +841,19 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
pos++
|
||||
}
|
||||
|
||||
//descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
|
||||
|
||||
//start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
@ -938,11 +868,6 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
//insert e1 header, Server id
|
||||
e1svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e1svrID))
|
||||
|
||||
//insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
@ -986,11 +911,6 @@ func TestDDLBinlog2(t *testing.T) {
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
//insert e2 header, Server id
|
||||
e2svrID := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e2svrID, int32(ServerID))
|
||||
pos += int(unsafe.Sizeof(e2svrID))
|
||||
|
||||
//insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
@ -19,14 +19,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
const (
|
||||
// todo : put to param table
|
||||
ServerID = 1
|
||||
BinlogVersion = 1
|
||||
CommitID = 1
|
||||
ServerVersion = 1
|
||||
)
|
||||
|
||||
// BinlogType is to distinguish different files saving different data.
|
||||
type BinlogType int32
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
@ -23,14 +24,13 @@ import (
|
||||
type descriptorEventData struct {
|
||||
DescriptorEventDataFixPart
|
||||
PostHeaderLengths []uint8
|
||||
ExtraLength int32
|
||||
ExtraBytes []byte
|
||||
Extras map[string]interface{}
|
||||
}
|
||||
|
||||
// DescriptorEventDataFixPart is a memorty struct saves events' DescriptorEventData.
|
||||
type DescriptorEventDataFixPart struct {
|
||||
BinlogVersion int16
|
||||
ServerVersion int64
|
||||
CommitID int64
|
||||
HeaderLength int8
|
||||
CollectionID int64
|
||||
PartitionID int64
|
||||
SegmentID int64
|
||||
@ -46,14 +46,32 @@ func (data *descriptorEventData) SetEventTimeStamp(start typeutil.Timestamp, end
|
||||
data.EndTimestamp = end
|
||||
}
|
||||
|
||||
// SetEventTimeStamp returns the memory size of DescriptorEventDataFixPart.
|
||||
// GetEventDataFixPartSize returns the memory size of DescriptorEventDataFixPart.
|
||||
func (data *descriptorEventData) GetEventDataFixPartSize() int32 {
|
||||
return int32(binary.Size(data.DescriptorEventDataFixPart))
|
||||
}
|
||||
|
||||
// SetEventTimeStamp returns the memory size of DescriptorEventDataFixPart.
|
||||
// GetMemoryUsageInBytes returns the memory size of DescriptorEventDataFixPart.
|
||||
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
|
||||
return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths))
|
||||
return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) + int32(binary.Size(data.ExtraLength)) + data.ExtraLength
|
||||
|
||||
}
|
||||
|
||||
// AddExtra add extra params to description event.
|
||||
func (data *descriptorEventData) AddExtra(k string, v interface{}) {
|
||||
data.Extras[k] = v
|
||||
}
|
||||
|
||||
// FinishExtra marshal extras to json format.
|
||||
// Call before GetMemoryUsageInBytes to get a accurate length of description event.
|
||||
func (data *descriptorEventData) FinishExtra() error {
|
||||
var err error
|
||||
data.ExtraBytes, err = json.Marshal(data.Extras)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data.ExtraLength = int32(len(data.ExtraBytes))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write transfer DescriptorEventDataFixPart to binary buffer.
|
||||
@ -64,6 +82,13 @@ func (data *descriptorEventData) Write(buffer io.Writer) error {
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.ExtraLength); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(buffer, binary.LittleEndian, data.ExtraBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -75,6 +100,18 @@ func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &event.ExtraLength); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event.ExtraBytes = make([]byte, event.ExtraLength)
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &event.ExtraBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := json.Unmarshal(event.ExtraBytes, &event.Extras); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
@ -253,9 +290,6 @@ func getEventFixPartSize(code EventTypeCode) int32 {
|
||||
func newDescriptorEventData() *descriptorEventData {
|
||||
data := descriptorEventData{
|
||||
DescriptorEventDataFixPart: DescriptorEventDataFixPart{
|
||||
BinlogVersion: BinlogVersion,
|
||||
ServerVersion: ServerVersion,
|
||||
CommitID: CommitID,
|
||||
CollectionID: -1,
|
||||
PartitionID: -1,
|
||||
SegmentID: -1,
|
||||
@ -265,6 +299,7 @@ func newDescriptorEventData() *descriptorEventData {
|
||||
PayloadDataType: -1,
|
||||
},
|
||||
PostHeaderLengths: []uint8{},
|
||||
Extras: make(map[string]interface{}),
|
||||
}
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
|
||||
@ -23,7 +23,6 @@ import (
|
||||
type baseEventHeader struct {
|
||||
Timestamp typeutil.Timestamp
|
||||
TypeCode EventTypeCode
|
||||
ServerID int32
|
||||
EventLength int32
|
||||
NextPosition int32
|
||||
}
|
||||
@ -63,7 +62,6 @@ func newDescriptorEventHeader() *descriptorEventHeader {
|
||||
header := descriptorEventHeader{
|
||||
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||
TypeCode: DescriptorEventType,
|
||||
ServerID: ServerID,
|
||||
}
|
||||
return &header
|
||||
}
|
||||
@ -73,7 +71,6 @@ func newEventHeader(eventTypeCode EventTypeCode) *eventHeader {
|
||||
baseEventHeader: baseEventHeader{
|
||||
Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0),
|
||||
TypeCode: eventTypeCode,
|
||||
ServerID: ServerID,
|
||||
EventLength: -1,
|
||||
NextPosition: -1,
|
||||
},
|
||||
|
||||
@ -64,71 +64,33 @@ func TestDescriptorEvent(t *testing.T) {
|
||||
|
||||
utc := UnsafeReadInt8(buffer, int(unsafe.Sizeof(ts)))
|
||||
assert.Equal(t, EventTypeCode(utc), DescriptorEventType)
|
||||
usID := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)))
|
||||
assert.Equal(t, usID, int32(ServerID))
|
||||
elen := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID)))
|
||||
elen := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)))
|
||||
assert.Equal(t, elen, int32(len(buffer)))
|
||||
nPos := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(usID)+unsafe.Sizeof(elen)))
|
||||
nPos := UnsafeReadInt32(buffer, int(unsafe.Sizeof(ts)+unsafe.Sizeof(utc)+unsafe.Sizeof(elen)))
|
||||
assert.GreaterOrEqual(t, nPos, int32(binary.Size(MagicNumber)+len(buffer)))
|
||||
t.Logf("next position = %d", nPos)
|
||||
|
||||
binVersion := UnsafeReadInt16(buffer, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, binVersion, int16(BinlogVersion))
|
||||
svrVersion := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion)))
|
||||
assert.Equal(t, svrVersion, int64(ServerVersion))
|
||||
commitID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+int(unsafe.Sizeof(binVersion))+int(unsafe.Sizeof(svrVersion)))
|
||||
assert.Equal(t, commitID, int64(CommitID))
|
||||
headLen := UnsafeReadInt8(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID)))
|
||||
assert.Equal(t, headLen, int8(binary.Size(eventHeader{})))
|
||||
t.Logf("head len = %d", headLen)
|
||||
collID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen)))
|
||||
collID := UnsafeReadInt64(buffer, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, collID, int64(-1))
|
||||
partID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID)))
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
segID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID)))
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
fieldID := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID)))
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(fieldID)))
|
||||
assert.Equal(t, startTs, int64(0))
|
||||
endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
@ -136,10 +98,6 @@ func TestDescriptorEvent(t *testing.T) {
|
||||
int(unsafe.Sizeof(startTs)))
|
||||
assert.Equal(t, endTs, int64(0))
|
||||
colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(binVersion))+
|
||||
int(unsafe.Sizeof(svrVersion))+
|
||||
int(unsafe.Sizeof(commitID))+
|
||||
int(unsafe.Sizeof(headLen))+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
@ -149,10 +107,6 @@ func TestDescriptorEvent(t *testing.T) {
|
||||
assert.Equal(t, colType, int32(-1))
|
||||
|
||||
postHeadOffset := binary.Size(eventHeader{}) +
|
||||
int(unsafe.Sizeof(binVersion)) +
|
||||
int(unsafe.Sizeof(svrVersion)) +
|
||||
int(unsafe.Sizeof(commitID)) +
|
||||
int(unsafe.Sizeof(headLen)) +
|
||||
int(unsafe.Sizeof(collID)) +
|
||||
int(unsafe.Sizeof(partID)) +
|
||||
int(unsafe.Sizeof(segID)) +
|
||||
@ -197,7 +151,6 @@ func TestInsertEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, InsertEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -371,7 +324,6 @@ func TestInsertEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, InsertEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -445,7 +397,6 @@ func TestDeleteEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DeleteEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -619,7 +570,6 @@ func TestDeleteEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DeleteEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -693,7 +643,6 @@ func TestCreateCollectionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreateCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -741,7 +690,6 @@ func TestCreateCollectionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreateCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -815,7 +763,6 @@ func TestDropCollectionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -863,7 +810,6 @@ func TestDropCollectionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropCollectionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -937,7 +883,6 @@ func TestCreatePartitionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreatePartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -985,7 +930,6 @@ func TestCreatePartitionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, CreatePartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -1059,7 +1003,6 @@ func TestDropPartitionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropPartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
@ -1107,7 +1050,6 @@ func TestDropPartitionEvent(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
|
||||
wBuf := buf.Bytes()
|
||||
checkEventHeader(t, wBuf, DropPartitionEventType, ServerID, int32(len(wBuf)))
|
||||
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
|
||||
assert.Equal(t, Timestamp(st), tsoutil.ComposeTS(10, 0))
|
||||
et := UnsafeReadInt64(wBuf, binary.Size(eventHeader{})+int(unsafe.Sizeof(st)))
|
||||
|
||||
@ -59,6 +59,13 @@ func (event *descriptorEvent) GetMemoryUsageInBytes() int32 {
|
||||
}
|
||||
|
||||
func (event *descriptorEvent) Write(buffer io.Writer) error {
|
||||
err := event.descriptorEventData.FinishExtra()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event.descriptorEventHeader.EventLength = event.descriptorEventHeader.GetMemoryUsageInBytes() + event.descriptorEventData.GetMemoryUsageInBytes()
|
||||
event.descriptorEventHeader.NextPosition = int32(binary.Size(MagicNumber)) + event.descriptorEventHeader.EventLength
|
||||
|
||||
if err := event.descriptorEventHeader.Write(buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -195,9 +202,6 @@ type dropPartitionEventWriter struct {
|
||||
func newDescriptorEvent() *descriptorEvent {
|
||||
header := newDescriptorEventHeader()
|
||||
data := newDescriptorEventData()
|
||||
header.EventLength = header.GetMemoryUsageInBytes() + data.GetMemoryUsageInBytes()
|
||||
header.NextPosition = int32(binary.Size(MagicNumber)) + header.EventLength
|
||||
data.HeaderLength = int8(binary.Size(eventHeader{}))
|
||||
return &descriptorEvent{
|
||||
descriptorEventHeader: *header,
|
||||
descriptorEventData: *data,
|
||||
|
||||
@ -47,7 +47,7 @@ func TestSizeofStruct(t *testing.T) {
|
||||
}
|
||||
err = de.Write(&buf)
|
||||
assert.Nil(t, err)
|
||||
s3 := binary.Size(de.DescriptorEventDataFixPart) + binary.Size(de.PostHeaderLengths)
|
||||
s3 := binary.Size(de.DescriptorEventDataFixPart) + binary.Size(de.PostHeaderLengths) + binary.Size(de.ExtraLength) + int(de.ExtraLength)
|
||||
assert.Equal(t, s3, buf.Len())
|
||||
}
|
||||
|
||||
|
||||
@ -64,14 +64,9 @@ func printBinlogFile(filename string) error {
|
||||
physical, _ := tsoutil.ParseTS(r.descriptorEvent.descriptorEventHeader.Timestamp)
|
||||
fmt.Printf("\tTimestamp: %v\n", physical)
|
||||
fmt.Printf("\tTypeCode: %s\n", r.descriptorEvent.descriptorEventHeader.TypeCode.String())
|
||||
fmt.Printf("\tServerID: %d\n", r.descriptorEvent.descriptorEventHeader.ServerID)
|
||||
fmt.Printf("\tEventLength: %d\n", r.descriptorEvent.descriptorEventHeader.EventLength)
|
||||
fmt.Printf("\tNextPosition :%d\n", r.descriptorEvent.descriptorEventHeader.NextPosition)
|
||||
fmt.Println("descriptor event data:")
|
||||
fmt.Printf("\tBinlogVersion: %d\n", r.descriptorEvent.descriptorEventData.BinlogVersion)
|
||||
fmt.Printf("\tServerVersion: %d\n", r.descriptorEvent.descriptorEventData.ServerVersion)
|
||||
fmt.Printf("\tCommitID: %d\n", r.descriptorEvent.descriptorEventData.CommitID)
|
||||
fmt.Printf("\tHeaderLength: %d\n", r.descriptorEvent.descriptorEventData.HeaderLength)
|
||||
fmt.Printf("\tCollectionID: %d\n", r.descriptorEvent.descriptorEventData.CollectionID)
|
||||
fmt.Printf("\tPartitionID: %d\n", r.descriptorEvent.descriptorEventData.PartitionID)
|
||||
fmt.Printf("\tSegmentID: %d\n", r.descriptorEvent.descriptorEventData.SegmentID)
|
||||
@ -99,7 +94,6 @@ func printBinlogFile(filename string) error {
|
||||
physical, _ = tsoutil.ParseTS(event.eventHeader.Timestamp)
|
||||
fmt.Printf("\tTimestamp: %v\n", physical)
|
||||
fmt.Printf("\tTypeCode: %s\n", event.eventHeader.TypeCode.String())
|
||||
fmt.Printf("\tServerID: %d\n", event.eventHeader.ServerID)
|
||||
fmt.Printf("\tEventLength: %d\n", event.eventHeader.EventLength)
|
||||
fmt.Printf("\tNextPosition: %d\n", event.eventHeader.NextPosition)
|
||||
switch event.eventHeader.TypeCode {
|
||||
|
||||
@ -13,6 +13,12 @@ package storage
|
||||
|
||||
import "unsafe"
|
||||
|
||||
/* #nosec G103 */
|
||||
func UnsafeReadByte(buf []byte, idx int) byte {
|
||||
ptr := unsafe.Pointer(&(buf[idx]))
|
||||
return *((*byte)(ptr))
|
||||
}
|
||||
|
||||
/* #nosec G103 */
|
||||
func UnsafeReadInt8(buf []byte, idx int) int8 {
|
||||
ptr := unsafe.Pointer(&(buf[idx]))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user