mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: remove not inused DDLCodec (#41485)
See also: #39242 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
01c0356ed3
commit
dab39c610b
@ -538,507 +538,6 @@ func TestDeleteBinlog(t *testing.T) {
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
/* #nosec G103 */
|
||||
func TestDDLBinlog1(t *testing.T) {
|
||||
w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50)
|
||||
|
||||
e1, err := w.NextCreateCollectionEventWriter()
|
||||
assert.NoError(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
|
||||
assert.NoError(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6}, nil)
|
||||
assert.Error(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6}, nil)
|
||||
assert.NoError(t, err)
|
||||
e1.SetEventTimestamp(100, 200)
|
||||
|
||||
e2, err := w.NextDropCollectionEventWriter()
|
||||
assert.NoError(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9}, nil)
|
||||
assert.NoError(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true}, nil)
|
||||
assert.Error(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12}, nil)
|
||||
assert.NoError(t, err)
|
||||
e2.SetEventTimestamp(300, 400)
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
sizeTotal := 2000000
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.Error(t, err)
|
||||
err = w.Finish()
|
||||
assert.NoError(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.NoError(t, err)
|
||||
|
||||
w.Close()
|
||||
|
||||
// magic number
|
||||
magicNum := UnsafeReadInt32(buf, 0)
|
||||
assert.Equal(t, magicNum, MagicNumber)
|
||||
pos := int(unsafe.Sizeof(MagicNumber))
|
||||
|
||||
// descriptor header, timestamp
|
||||
ts := UnsafeReadInt64(buf, pos)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
diffts := curts - ts
|
||||
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(ts))
|
||||
|
||||
// descriptor header, type code
|
||||
tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
// descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
|
||||
// descriptor header, next position
|
||||
descNxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
pos += int(unsafe.Sizeof(collID))
|
||||
|
||||
// descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
// descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
// descriptor data fix, field id
|
||||
fieldID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
pos += int(unsafe.Sizeof(startts))
|
||||
|
||||
// descriptor data fix, end time stamp
|
||||
endts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, endts, int64(2000))
|
||||
pos += int(unsafe.Sizeof(endts))
|
||||
|
||||
// descriptor data fix, payload type
|
||||
colType := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_Int64)
|
||||
pos += int(unsafe.Sizeof(colType))
|
||||
|
||||
// descriptor data, post header lengths
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, uint8(size), buf[pos])
|
||||
pos++
|
||||
}
|
||||
|
||||
// descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
var extra map[string]interface{}
|
||||
err = json.Unmarshal(multiBytes, &extra)
|
||||
assert.NoError(t, err)
|
||||
testExtra, ok := extra["test"]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
|
||||
size, ok := extra[originalSizeKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
|
||||
|
||||
// start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
// insert e1 header, Timestamp
|
||||
e1ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e1ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e1ts))
|
||||
|
||||
// insert e1 header, type code
|
||||
e1tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
// insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
|
||||
// insert e1 header, next position
|
||||
e1NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// insert e1 data, start time stamp
|
||||
e1st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1st, int64(100))
|
||||
pos += int(unsafe.Sizeof(e1st))
|
||||
|
||||
// insert e1 data, end time stamp
|
||||
e1et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1et, int64(200))
|
||||
pos += int(unsafe.Sizeof(e1et))
|
||||
|
||||
// insert e1, payload
|
||||
e1Payload := buf[pos:e1NxtPos]
|
||||
e1r, err := NewPayloadReader(schemapb.DataType_Int64, e1Payload, false)
|
||||
assert.NoError(t, err)
|
||||
e1a, valids, err := e1r.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
|
||||
e1r.Close()
|
||||
|
||||
// start of e2
|
||||
pos = int(e1NxtPos)
|
||||
|
||||
// insert e2 header, Timestamp
|
||||
e2ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e2ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e2ts))
|
||||
|
||||
// insert e2 header, type code
|
||||
e2tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
// insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
// insert e2 header, next position
|
||||
e2NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// insert e2 data, start time stamp
|
||||
e2st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2st, int64(300))
|
||||
pos += int(unsafe.Sizeof(e2st))
|
||||
|
||||
// insert e2 data, end time stamp
|
||||
e2et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2et, int64(400))
|
||||
pos += int(unsafe.Sizeof(e2et))
|
||||
|
||||
// insert e2, payload
|
||||
e2Payload := buf[pos:]
|
||||
e2r, err := NewPayloadReader(schemapb.DataType_Int64, e2Payload, false)
|
||||
assert.NoError(t, err)
|
||||
e2a, valids, err := e2r.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
|
||||
e2r.Close()
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
// read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.NoError(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, valids, err := event1.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, event1.TypeCode, CreateCollectionEventType)
|
||||
ed1, ok := (event1.eventData).(*createCollectionEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, valids, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, DropCollectionEventType)
|
||||
ed2, ok := (event2.eventData).(*dropCollectionEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*insertEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
/* #nosec G103 */
|
||||
func TestDDLBinlog2(t *testing.T) {
|
||||
w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50)
|
||||
|
||||
e1, err := w.NextCreatePartitionEventWriter()
|
||||
assert.NoError(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
|
||||
assert.NoError(t, err)
|
||||
err = e1.AddDataToPayload([]int32{4, 5, 6}, nil)
|
||||
assert.Error(t, err)
|
||||
err = e1.AddDataToPayload([]int64{4, 5, 6}, nil)
|
||||
assert.NoError(t, err)
|
||||
e1.SetEventTimestamp(100, 200)
|
||||
|
||||
e2, err := w.NextDropPartitionEventWriter()
|
||||
assert.NoError(t, err)
|
||||
err = e2.AddDataToPayload([]int64{7, 8, 9}, nil)
|
||||
assert.NoError(t, err)
|
||||
err = e2.AddDataToPayload([]bool{true, false, true}, nil)
|
||||
assert.Error(t, err)
|
||||
err = e2.AddDataToPayload([]int64{10, 11, 12}, nil)
|
||||
assert.NoError(t, err)
|
||||
e2.SetEventTimestamp(300, 400)
|
||||
|
||||
w.SetEventTimeStamp(1000, 2000)
|
||||
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
|
||||
sizeTotal := 2000000
|
||||
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
|
||||
|
||||
_, err = w.GetBuffer()
|
||||
assert.Error(t, err)
|
||||
err = w.Finish()
|
||||
assert.NoError(t, err)
|
||||
buf, err := w.GetBuffer()
|
||||
assert.NoError(t, err)
|
||||
w.Close()
|
||||
|
||||
// magic number
|
||||
magicNum := UnsafeReadInt32(buf, 0)
|
||||
assert.Equal(t, magicNum, MagicNumber)
|
||||
pos := int(unsafe.Sizeof(MagicNumber))
|
||||
|
||||
// descriptor header, timestamp
|
||||
ts := UnsafeReadInt64(buf, pos)
|
||||
assert.Greater(t, ts, int64(0))
|
||||
curts := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
curts = int64(tsoutil.ComposeTS(curts, 0))
|
||||
diffts := curts - ts
|
||||
maxdiff := int64(tsoutil.ComposeTS(1000, 0))
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(ts))
|
||||
|
||||
// descriptor header, type code
|
||||
tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(tc), DescriptorEventType)
|
||||
pos += int(unsafe.Sizeof(tc))
|
||||
|
||||
// descriptor header, event length
|
||||
descEventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(descEventLen))
|
||||
|
||||
// descriptor header, next position
|
||||
descNxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// descriptor data fix, collection id
|
||||
collID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, collID, int64(50))
|
||||
pos += int(unsafe.Sizeof(collID))
|
||||
|
||||
// descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
// descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
// descriptor data fix, field id
|
||||
fieldID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
pos += int(unsafe.Sizeof(startts))
|
||||
|
||||
// descriptor data fix, end time stamp
|
||||
endts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, endts, int64(2000))
|
||||
pos += int(unsafe.Sizeof(endts))
|
||||
|
||||
// descriptor data fix, payload type
|
||||
colType := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_Int64)
|
||||
pos += int(unsafe.Sizeof(colType))
|
||||
|
||||
// descriptor data, post header lengths
|
||||
for i := DescriptorEventType; i < EventTypeEnd; i++ {
|
||||
size := getEventFixPartSize(i)
|
||||
assert.Equal(t, uint8(size), buf[pos])
|
||||
pos++
|
||||
}
|
||||
|
||||
// descriptor data, extra length
|
||||
extraLength := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength)
|
||||
pos += int(unsafe.Sizeof(extraLength))
|
||||
|
||||
multiBytes := make([]byte, extraLength)
|
||||
for i := 0; i < int(extraLength); i++ {
|
||||
singleByte := UnsafeReadByte(buf, pos)
|
||||
multiBytes[i] = singleByte
|
||||
pos++
|
||||
}
|
||||
var extra map[string]interface{}
|
||||
err = json.Unmarshal(multiBytes, &extra)
|
||||
assert.NoError(t, err)
|
||||
testExtra, ok := extra["test"]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
|
||||
size, ok := extra[originalSizeKey]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
|
||||
|
||||
// start of e1
|
||||
assert.Equal(t, pos, int(descNxtPos))
|
||||
|
||||
// insert e1 header, Timestamp
|
||||
e1ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e1ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e1ts))
|
||||
|
||||
// insert e1 header, type code
|
||||
e1tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e1tc))
|
||||
|
||||
// insert e1 header, event length
|
||||
e1EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e1EventLen))
|
||||
|
||||
// insert e1 header, next position
|
||||
e1NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// insert e1 data, start time stamp
|
||||
e1st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1st, int64(100))
|
||||
pos += int(unsafe.Sizeof(e1st))
|
||||
|
||||
// insert e1 data, end time stamp
|
||||
e1et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e1et, int64(200))
|
||||
pos += int(unsafe.Sizeof(e1et))
|
||||
|
||||
// insert e1, payload
|
||||
e1Payload := buf[pos:e1NxtPos]
|
||||
e1r, err := NewPayloadReader(schemapb.DataType_Int64, e1Payload, false)
|
||||
assert.NoError(t, err)
|
||||
e1a, valids, err := e1r.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6})
|
||||
e1r.Close()
|
||||
|
||||
// start of e2
|
||||
pos = int(e1NxtPos)
|
||||
|
||||
// insert e2 header, Timestamp
|
||||
e2ts := UnsafeReadInt64(buf, pos)
|
||||
diffts = curts - e2ts
|
||||
assert.LessOrEqual(t, diffts, maxdiff)
|
||||
pos += int(unsafe.Sizeof(e2ts))
|
||||
|
||||
// insert e2 header, type code
|
||||
e2tc := UnsafeReadInt8(buf, pos)
|
||||
assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType)
|
||||
pos += int(unsafe.Sizeof(e2tc))
|
||||
|
||||
// insert e2 header, event length
|
||||
e2EventLen := UnsafeReadInt32(buf, pos)
|
||||
pos += int(unsafe.Sizeof(e2EventLen))
|
||||
|
||||
// insert e2 header, next position
|
||||
e2NxtPos := UnsafeReadInt32(buf, pos)
|
||||
assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos)
|
||||
pos += int(unsafe.Sizeof(descNxtPos))
|
||||
|
||||
// insert e2 data, start time stamp
|
||||
e2st := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2st, int64(300))
|
||||
pos += int(unsafe.Sizeof(e2st))
|
||||
|
||||
// insert e2 data, end time stamp
|
||||
e2et := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, e2et, int64(400))
|
||||
pos += int(unsafe.Sizeof(e2et))
|
||||
|
||||
// insert e2, payload
|
||||
e2Payload := buf[pos:]
|
||||
e2r, err := NewPayloadReader(schemapb.DataType_Int64, e2Payload, false)
|
||||
assert.NoError(t, err)
|
||||
e2a, valids, err := e2r.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12})
|
||||
e2r.Close()
|
||||
|
||||
assert.Equal(t, int(e2NxtPos), len(buf))
|
||||
|
||||
// read binlog
|
||||
r, err := NewBinlogReader(buf)
|
||||
assert.NoError(t, err)
|
||||
event1, err := r.NextEventReader()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, event1)
|
||||
p1, valids, err := event1.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, event1.TypeCode, CreatePartitionEventType)
|
||||
ed1, ok := (event1.eventData).(*createPartitionEventData)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, ed1.StartTimestamp, Timestamp(100))
|
||||
assert.Equal(t, ed1.EndTimestamp, Timestamp(200))
|
||||
|
||||
event2, err := r.NextEventReader()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, event2)
|
||||
p2, valids, err := event2.GetInt64FromPayload()
|
||||
assert.Nil(t, valids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12})
|
||||
assert.Equal(t, event2.TypeCode, DropPartitionEventType)
|
||||
ed2, ok := (event2.eventData).(*dropPartitionEventData)
|
||||
assert.True(t, ok)
|
||||
_, ok = (event2.eventData).(*insertEventData)
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, ed2.StartTimestamp, Timestamp(300))
|
||||
assert.Equal(t, ed2.EndTimestamp, Timestamp(400))
|
||||
}
|
||||
|
||||
/* #nosec G103 */
|
||||
func TestIndexFileBinlog(t *testing.T) {
|
||||
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
@ -1432,42 +931,6 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) {
|
||||
deleteWriter.Close()
|
||||
}
|
||||
|
||||
func TestDDBinlogWriteCloseError(t *testing.T) {
|
||||
ddBinlogWriter := NewDDLBinlogWriter(schemapb.DataType_Int64, 10)
|
||||
e1, err := ddBinlogWriter.NextCreateCollectionEventWriter()
|
||||
assert.NoError(t, err)
|
||||
|
||||
sizeTotal := 2000000
|
||||
ddBinlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
|
||||
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
|
||||
assert.NoError(t, err)
|
||||
e1.SetEventTimestamp(100, 200)
|
||||
|
||||
ddBinlogWriter.SetEventTimeStamp(1000, 2000)
|
||||
err = ddBinlogWriter.Finish()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, ddBinlogWriter.buffer)
|
||||
|
||||
createCollectionEventWriter, err := ddBinlogWriter.NextCreateCollectionEventWriter()
|
||||
assert.Nil(t, createCollectionEventWriter)
|
||||
assert.Error(t, err)
|
||||
|
||||
dropCollectionEventWriter, err := ddBinlogWriter.NextDropCollectionEventWriter()
|
||||
assert.Nil(t, dropCollectionEventWriter)
|
||||
assert.Error(t, err)
|
||||
|
||||
createPartitionEventWriter, err := ddBinlogWriter.NextCreatePartitionEventWriter()
|
||||
assert.Nil(t, createPartitionEventWriter)
|
||||
assert.Error(t, err)
|
||||
|
||||
dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter()
|
||||
assert.Nil(t, dropPartitionEventWriter)
|
||||
assert.Error(t, err)
|
||||
|
||||
ddBinlogWriter.Close()
|
||||
}
|
||||
|
||||
type testEvent struct {
|
||||
PayloadWriterInterface
|
||||
finishError bool
|
||||
|
||||
@ -184,67 +184,6 @@ func (writer *DeleteBinlogWriter) NextDeleteEventWriter(opts ...PayloadWriterOpt
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// DDLBinlogWriter is an object to write binlog file which saves ddl information.
|
||||
type DDLBinlogWriter struct {
|
||||
baseBinlogWriter
|
||||
}
|
||||
|
||||
// NextCreateCollectionEventWriter returns an event writer to write CreateCollection
|
||||
// information to an event.
|
||||
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
|
||||
if writer.isClosed() {
|
||||
return nil, errors.New("binlog has closed")
|
||||
}
|
||||
event, err := newCreateCollectionEventWriter(writer.PayloadDataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer.eventWriters = append(writer.eventWriters, event)
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// NextDropCollectionEventWriter returns an event writer to write DropCollection
|
||||
// information to an event.
|
||||
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
|
||||
if writer.isClosed() {
|
||||
return nil, errors.New("binlog has closed")
|
||||
}
|
||||
event, err := newDropCollectionEventWriter(writer.PayloadDataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer.eventWriters = append(writer.eventWriters, event)
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// NextCreatePartitionEventWriter returns an event writer to write CreatePartition
|
||||
// information to an event.
|
||||
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
|
||||
if writer.isClosed() {
|
||||
return nil, errors.New("binlog has closed")
|
||||
}
|
||||
event, err := newCreatePartitionEventWriter(writer.PayloadDataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer.eventWriters = append(writer.eventWriters, event)
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// NextDropPartitionEventWriter returns an event writer to write DropPartition
|
||||
// information to an event.
|
||||
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
|
||||
if writer.isClosed() {
|
||||
return nil, errors.New("binlog has closed")
|
||||
}
|
||||
event, err := newDropPartitionEventWriter(writer.PayloadDataType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer.eventWriters = append(writer.eventWriters, event)
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// IndexFileBinlogWriter is an object to write binlog file which saves index files
|
||||
type IndexFileBinlogWriter struct {
|
||||
baseBinlogWriter
|
||||
@ -305,20 +244,3 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.
|
||||
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter {
|
||||
descriptorEvent := newDescriptorEvent()
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
descriptorEvent.CollectionID = collectionID
|
||||
w := &DDLBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: *descriptorEvent,
|
||||
magicNumber: MagicNumber,
|
||||
binlogType: DDLBinlog,
|
||||
eventWriters: make([]EventWriter, 0),
|
||||
buffer: nil,
|
||||
},
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
||||
@ -863,184 +863,3 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
||||
|
||||
return pid, sid, result, nil
|
||||
}
|
||||
|
||||
// DataDefinitionCodec serializes and deserializes the data definition
|
||||
// Blob key example:
|
||||
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
||||
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
||||
type DataDefinitionCodec struct {
|
||||
collectionID int64
|
||||
}
|
||||
|
||||
// NewDataDefinitionCodec is constructor for DataDefinitionCodec
|
||||
func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
|
||||
return &DataDefinitionCodec{collectionID: collectionID}
|
||||
}
|
||||
|
||||
// Serialize transfer @ts and @ddRequsts to blob.
|
||||
// From schema, it get all fields.
|
||||
// For each field, it will create a binlog writer, and write specific event according
|
||||
// to the dataDefinition type.
|
||||
// It returns blobs in the end.
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
|
||||
writer := NewDDLBinlogWriter(schemapb.DataType_Int64, dataDefinitionCodec.collectionID)
|
||||
eventWriter, err := writer.NextCreateCollectionEventWriter()
|
||||
if err != nil {
|
||||
writer.Close()
|
||||
return nil, err
|
||||
}
|
||||
defer writer.Close()
|
||||
defer eventWriter.Close()
|
||||
|
||||
var blobs []*Blob
|
||||
|
||||
var int64Ts []int64
|
||||
for _, singleTs := range ts {
|
||||
int64Ts = append(int64Ts, int64(singleTs))
|
||||
}
|
||||
err = eventWriter.AddInt64ToPayload(int64Ts, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetEventTimestamp(ts[0], ts[len(ts)-1])
|
||||
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
|
||||
|
||||
// https://github.com/milvus-io/milvus/issues/9620
|
||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts)))
|
||||
|
||||
err = writer.Finish()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buffer, err := writer.GetBuffer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blobs = append(blobs, &Blob{
|
||||
Key: Ts,
|
||||
Value: buffer,
|
||||
})
|
||||
eventWriter.Close()
|
||||
writer.Close()
|
||||
|
||||
writer = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID)
|
||||
|
||||
sizeTotal := 0
|
||||
for pos, req := range ddRequests {
|
||||
sizeTotal += len(req)
|
||||
switch eventTypes[pos] {
|
||||
case CreateCollectionEventType:
|
||||
eventWriter, err := writer.NextCreateCollectionEventWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
|
||||
case DropCollectionEventType:
|
||||
eventWriter, err := writer.NextDropCollectionEventWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
|
||||
case CreatePartitionEventType:
|
||||
eventWriter, err := writer.NextCreatePartitionEventWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
|
||||
case DropPartitionEventType:
|
||||
eventWriter, err := writer.NextDropPartitionEventWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
|
||||
}
|
||||
}
|
||||
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
|
||||
|
||||
// https://github.com/milvus-io/milvus/issues/9620
|
||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
|
||||
|
||||
if err = writer.Finish(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if buffer, err = writer.GetBuffer(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blobs = append(blobs, &Blob{
|
||||
Key: DDL,
|
||||
Value: buffer,
|
||||
})
|
||||
|
||||
return blobs, nil
|
||||
}
|
||||
|
||||
// Deserialize transfer blob back to data definition data.
|
||||
// From schema, it get all fields.
|
||||
// It will sort blob by blob key for blob logid is increasing by time.
|
||||
// For each field, it will create a binlog reader, and read all event to the buffer.
|
||||
// It returns origin @ts and @ddRequests in the end.
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
|
||||
if len(blobs) == 0 {
|
||||
return nil, nil, errors.New("blobs is empty")
|
||||
}
|
||||
var requestsStrings []string
|
||||
var resultTs []Timestamp
|
||||
|
||||
var blobList BlobList = blobs
|
||||
sort.Sort(blobList)
|
||||
|
||||
for _, blob := range blobList {
|
||||
binlogReader, err := NewBinlogReader(blob.Value)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
dataType := binlogReader.PayloadDataType
|
||||
|
||||
for {
|
||||
eventReader, err := binlogReader.NextEventReader()
|
||||
if err != nil {
|
||||
binlogReader.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
if eventReader == nil {
|
||||
break
|
||||
}
|
||||
switch dataType {
|
||||
case schemapb.DataType_Int64:
|
||||
int64Ts, _, err := eventReader.GetInt64FromPayload()
|
||||
if err != nil {
|
||||
eventReader.Close()
|
||||
binlogReader.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
for _, singleTs := range int64Ts {
|
||||
resultTs = append(resultTs, Timestamp(singleTs))
|
||||
}
|
||||
case schemapb.DataType_String:
|
||||
stringPayload, _, err := eventReader.GetStringFromPayload()
|
||||
if err != nil {
|
||||
eventReader.Close()
|
||||
binlogReader.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
requestsStrings = append(requestsStrings, stringPayload...)
|
||||
}
|
||||
eventReader.Close()
|
||||
}
|
||||
binlogReader.Close()
|
||||
}
|
||||
|
||||
return resultTs, requestsStrings, nil
|
||||
}
|
||||
|
||||
@ -1168,36 +1168,6 @@ func TestUpgradeDeleteLog(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestDDCodec(t *testing.T) {
|
||||
dataDefinitionCodec := NewDataDefinitionCodec(int64(1))
|
||||
ts := []Timestamp{1, 2, 3, 4}
|
||||
ddRequests := []string{
|
||||
"CreateCollection",
|
||||
"DropCollection",
|
||||
"CreatePartition",
|
||||
"DropPartition",
|
||||
}
|
||||
eventTypeCodes := []EventTypeCode{
|
||||
CreateCollectionEventType,
|
||||
DropCollectionEventType,
|
||||
CreatePartitionEventType,
|
||||
DropPartitionEventType,
|
||||
}
|
||||
blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes)
|
||||
assert.NoError(t, err)
|
||||
for _, blob := range blobs {
|
||||
blob.Key = fmt.Sprintf("1/data_definition/3/4/5/%d", 99)
|
||||
}
|
||||
resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resultTs, ts)
|
||||
assert.Equal(t, resultRequests, ddRequests)
|
||||
|
||||
blobs = []*Blob{}
|
||||
_, _, err = dataDefinitionCodec.Deserialize(blobs)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestTsError(t *testing.T) {
|
||||
insertData := &InsertData{}
|
||||
insertCodec := NewInsertCodecWithSchema(nil)
|
||||
|
||||
@ -23,10 +23,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
|
||||
@ -364,121 +362,6 @@ func TestPrintBinlogFiles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrintDDFiles(t *testing.T) {
|
||||
dataDefinitionCodec := NewDataDefinitionCodec(int64(1))
|
||||
ts := []Timestamp{
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
}
|
||||
collID := int64(1)
|
||||
partitionID := int64(1)
|
||||
collName := "test"
|
||||
partitionName := "test"
|
||||
createCollReq := msgpb.CreateCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_CreateCollection,
|
||||
MsgID: 1,
|
||||
Timestamp: 1,
|
||||
SourceID: 1,
|
||||
},
|
||||
CollectionID: collID,
|
||||
Schema: make([]byte, 0),
|
||||
CollectionName: collName,
|
||||
DbName: "DbName",
|
||||
DbID: UniqueID(0),
|
||||
}
|
||||
createCollString, err := proto.Marshal(&createCollReq)
|
||||
assert.NoError(t, err)
|
||||
|
||||
dropCollReq := msgpb.DropCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropCollection,
|
||||
MsgID: 2,
|
||||
Timestamp: 2,
|
||||
SourceID: 2,
|
||||
},
|
||||
CollectionID: collID,
|
||||
CollectionName: collName,
|
||||
DbName: "DbName",
|
||||
DbID: UniqueID(0),
|
||||
}
|
||||
dropCollString, err := proto.Marshal(&dropCollReq)
|
||||
assert.NoError(t, err)
|
||||
|
||||
createPartitionReq := msgpb.CreatePartitionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_CreatePartition,
|
||||
MsgID: 3,
|
||||
Timestamp: 3,
|
||||
SourceID: 3,
|
||||
},
|
||||
CollectionID: collID,
|
||||
PartitionID: partitionID,
|
||||
CollectionName: collName,
|
||||
PartitionName: partitionName,
|
||||
DbName: "DbName",
|
||||
DbID: UniqueID(0),
|
||||
}
|
||||
createPartitionString, err := proto.Marshal(&createPartitionReq)
|
||||
assert.NoError(t, err)
|
||||
|
||||
dropPartitionReq := msgpb.DropPartitionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DropPartition,
|
||||
MsgID: 4,
|
||||
Timestamp: 4,
|
||||
SourceID: 4,
|
||||
},
|
||||
CollectionID: collID,
|
||||
PartitionID: partitionID,
|
||||
CollectionName: collName,
|
||||
PartitionName: partitionName,
|
||||
DbName: "DbName",
|
||||
DbID: UniqueID(0),
|
||||
}
|
||||
dropPartitionString, err := proto.Marshal(&dropPartitionReq)
|
||||
assert.NoError(t, err)
|
||||
ddRequests := []string{
|
||||
string(createCollString),
|
||||
string(dropCollString),
|
||||
string(createPartitionString),
|
||||
string(dropPartitionString),
|
||||
}
|
||||
eventTypeCodes := []EventTypeCode{
|
||||
CreateCollectionEventType,
|
||||
DropCollectionEventType,
|
||||
CreatePartitionEventType,
|
||||
DropPartitionEventType,
|
||||
}
|
||||
blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes)
|
||||
assert.NoError(t, err)
|
||||
var binlogFiles []string
|
||||
for index, blob := range blobs {
|
||||
blob.Key = fmt.Sprintf("1/data_definition/3/4/5/%d", 99)
|
||||
fileName := fmt.Sprintf("/tmp/ddblob_%d.db", index)
|
||||
binlogFiles = append(binlogFiles, fileName)
|
||||
fd, err := os.Create(fileName)
|
||||
assert.NoError(t, err)
|
||||
num, err := fd.Write(blob.GetValue())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, num, len(blob.GetValue()))
|
||||
err = fd.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, resultTs, ts)
|
||||
assert.Equal(t, resultRequests, ddRequests)
|
||||
|
||||
PrintBinlogFiles(binlogFiles)
|
||||
|
||||
for _, file := range binlogFiles {
|
||||
_ = os.RemoveAll(file)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrintIndexFile(t *testing.T) {
|
||||
indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user