diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 4503449189..6642eed1ff 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -291,9 +291,9 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { inReduced := make([]bool, len(searchResults)) numSegment := int64(len(searchResults)) - err2 := reduceSearchResults(searchResults, numSegment, inReduced) - if err2 != nil { - return err2 + err = reduceSearchResults(searchResults, numSegment, inReduced) + if err != nil { + return err } err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced) if err != nil { diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go index f51943c9eb..3bdfa884ec 100644 --- a/internal/storage/binlog_reader.go +++ b/internal/storage/binlog_reader.go @@ -11,24 +11,15 @@ import ( type BinlogReader struct { magicNumber int32 descriptorEvent - currentEventReader *EventReader - buffer *bytes.Buffer - bufferLength int - currentOffset int32 - isClose bool + buffer *bytes.Buffer + eventList []*EventReader + isClose bool } func (reader *BinlogReader) NextEventReader() (*EventReader, error) { if reader.isClose { return nil, errors.New("bin log reader is closed") } - if reader.currentEventReader != nil { - reader.currentOffset = reader.currentEventReader.NextPosition - if err := reader.currentEventReader.Close(); err != nil { - return nil, err - } - reader.currentEventReader = nil - } if reader.buffer.Len() <= 0 { return nil, nil } @@ -36,15 +27,14 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) { if err != nil { return nil, err } - reader.currentEventReader = eventReader - return reader.currentEventReader, nil + reader.eventList = append(reader.eventList, eventReader) + return eventReader, nil } func (reader *BinlogReader) readMagicNumber() (int32, error) { if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil { return -1, err } - reader.currentOffset = 4 if reader.magicNumber != MagicNumber { return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(int(MagicNumber)) + ", actual: " + strconv.Itoa(int(reader.magicNumber))) @@ -55,7 +45,6 @@ func (reader *BinlogReader) readMagicNumber() (int32, error) { func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) { event, err := ReadDescriptorEvent(reader.buffer) - reader.currentOffset = event.NextPosition if err != nil { return nil, err } @@ -67,20 +56,20 @@ func (reader *BinlogReader) Close() error { if reader.isClose { return nil } - reader.isClose = true - if reader.currentEventReader != nil { - if err := reader.currentEventReader.Close(); err != nil { + for _, e := range reader.eventList { + if err := e.Close(); err != nil { return err } } + reader.isClose = true return nil } func NewBinlogReader(data []byte) (*BinlogReader, error) { reader := &BinlogReader{ - buffer: bytes.NewBuffer(data), - bufferLength: len(data), - isClose: false, + buffer: bytes.NewBuffer(data), + eventList: []*EventReader{}, + isClose: false, } if _, err := reader.readMagicNumber(); err != nil { diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index 75ff432ab2..0ca7dd47f7 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -241,4 +241,812 @@ func TestInsertBinlog(t *testing.T) { assert.Equal(t, int(e2NxtPos), len(buf)) + //read binlog + r, err := NewBinlogReader(buf) + assert.Nil(t, err) + event1, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event1) + p1, err := event1.GetInt64FromPayload() + assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) + assert.Nil(t, err) + assert.Equal(t, event1.TypeCode, InsertEventType) + ed1, ok := (event1.eventData).(*insertEventData) + assert.True(t, ok) + assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) + assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) + + event2, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event2) + p2, err := event2.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) + assert.Equal(t, event2.TypeCode, InsertEventType) + ed2, ok := (event2.eventData).(*insertEventData) + assert.True(t, ok) + _, ok = (event2.eventData).(*deleteEventData) + assert.False(t, ok) + assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) + assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) +} + +func TestDeleteBinlog(t *testing.T) { + w, err := NewDeleteBinlogWriter(schemapb.DataType_INT64, 50) + assert.Nil(t, err) + + e1, err := w.NextDeleteEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + e2, err := w.NextDeleteEventWriter() + assert.Nil(t, err) + err = e2.AddDataToPayload([]int64{7, 8, 9}) + assert.Nil(t, err) + err = e2.AddDataToPayload([]bool{true, false, true}) + assert.NotNil(t, err) + err = e2.AddDataToPayload([]int64{10, 11, 12}) + assert.Nil(t, err) + e2.SetStartTimestamp(300) + e2.SetEndTimestamp(400) + + w.SetStartTimeStamp(1000) + w.SetEndTimeStamp(2000) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + buf, err := w.GetBuffer() + assert.Nil(t, err) + + //magic number + magicNum := UnsafeReadInt32(buf, 0) + assert.Equal(t, magicNum, MagicNumber) + pos := int(unsafe.Sizeof(MagicNumber)) + + //descriptor header, timestamp + ts := UnsafeReadInt64(buf, pos) + assert.Greater(t, ts, int64(0)) + curts := time.Now().UnixNano() / int64(time.Millisecond) + curts = int64(tsoutil.ComposeTS(curts, 0)) + diffts := curts - ts + maxdiff := int64(tsoutil.ComposeTS(1000, 0)) + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(ts)) + + //descriptor header, type code + tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(tc), DescriptorEventType) + pos += int(unsafe.Sizeof(tc)) + + //descriptor header, server id + svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(svrID)) + + //descriptor header, event length + descEventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(descEventLen)) + + //descriptor header, next position + descNxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //descriptor data fix, binlog version + binLogVer := UnsafeReadInt16(buf, pos) + assert.Equal(t, binLogVer, int16(BinlogVersion)) + pos += int(unsafe.Sizeof(binLogVer)) + + //descriptor data fix, server version + svrVer := UnsafeReadInt64(buf, pos) + assert.Equal(t, svrVer, int64(ServerVersion)) + pos += int(unsafe.Sizeof(svrVer)) + + //descriptor data fix, commit id + cmitID := UnsafeReadInt64(buf, pos) + assert.Equal(t, cmitID, int64(CommitID)) + pos += int(unsafe.Sizeof(cmitID)) + + //descriptor data fix, header length + headLen := UnsafeReadInt8(buf, pos) + assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) + pos += int(unsafe.Sizeof(headLen)) + + //descriptor data fix, collection id + collID := UnsafeReadInt64(buf, pos) + assert.Equal(t, collID, int64(50)) + pos += int(unsafe.Sizeof(collID)) + + //descriptor data fix, partition id + partID := UnsafeReadInt64(buf, pos) + assert.Equal(t, partID, int64(-1)) + pos += int(unsafe.Sizeof(partID)) + + //descriptor data fix, segment id + segID := UnsafeReadInt64(buf, pos) + assert.Equal(t, segID, int64(-1)) + pos += int(unsafe.Sizeof(segID)) + + //descriptor data fix, field id + fieldID := UnsafeReadInt64(buf, pos) + assert.Equal(t, fieldID, int64(-1)) + pos += int(unsafe.Sizeof(fieldID)) + + //descriptor data fix, start time stamp + startts := UnsafeReadInt64(buf, pos) + assert.Equal(t, startts, int64(1000)) + pos += int(unsafe.Sizeof(startts)) + + //descriptor data fix, end time stamp + endts := UnsafeReadInt64(buf, pos) + assert.Equal(t, endts, int64(2000)) + pos += int(unsafe.Sizeof(endts)) + + //descriptor data fix, payload type + colType := UnsafeReadInt32(buf, pos) + assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) + pos += int(unsafe.Sizeof(colType)) + + //descriptor data, post header lengths + for i := DescriptorEventType; i < EventTypeEnd; i++ { + size := getEventFixPartSize(i) + assert.Equal(t, uint8(size), buf[pos]) + pos++ + } + + //start of e1 + assert.Equal(t, pos, int(descNxtPos)) + + //insert e1 header, Timestamp + e1ts := UnsafeReadInt64(buf, pos) + diffts = curts - e1ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e1ts)) + + //insert e1 header, type code + e1tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e1tc), DeleteEventType) + pos += int(unsafe.Sizeof(e1tc)) + + //insert e1 header, Server id + e1svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e1svrID)) + + //insert e1 header, event length + e1EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e1EventLen)) + + //insert e1 header, next position + e1NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e1 data, start time stamp + e1st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1st, int64(100)) + pos += int(unsafe.Sizeof(e1st)) + + //insert e1 data, end time stamp + e1et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1et, int64(200)) + pos += int(unsafe.Sizeof(e1et)) + + //insert e1, payload + e1Payload := buf[pos:e1NxtPos] + e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) + assert.Nil(t, err) + e1a, err := e1r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) + err = e1r.Close() + assert.Nil(t, err) + + //start of e2 + pos = int(e1NxtPos) + + //insert e2 header, Timestamp + e2ts := UnsafeReadInt64(buf, pos) + diffts = curts - e2ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e2ts)) + + //insert e2 header, type code + e2tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e2tc), DeleteEventType) + pos += int(unsafe.Sizeof(e2tc)) + + //insert e2 header, Server id + e2svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e2svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e2svrID)) + + //insert e2 header, event length + e2EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e2EventLen)) + + //insert e2 header, next position + e2NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e2 data, start time stamp + e2st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2st, int64(300)) + pos += int(unsafe.Sizeof(e2st)) + + //insert e2 data, end time stamp + e2et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2et, int64(400)) + pos += int(unsafe.Sizeof(e2et)) + + //insert e2, payload + e2Payload := buf[pos:] + e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) + assert.Nil(t, err) + e2a, err := e2r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) + err = e2r.Close() + assert.Nil(t, err) + + assert.Equal(t, int(e2NxtPos), len(buf)) + + //read binlog + r, err := NewBinlogReader(buf) + assert.Nil(t, err) + event1, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event1) + p1, err := event1.GetInt64FromPayload() + assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) + assert.Nil(t, err) + assert.Equal(t, event1.TypeCode, DeleteEventType) + ed1, ok := (event1.eventData).(*deleteEventData) + assert.True(t, ok) + assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) + assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) + + event2, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event2) + p2, err := event2.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) + assert.Equal(t, event2.TypeCode, DeleteEventType) + ed2, ok := (event2.eventData).(*deleteEventData) + assert.True(t, ok) + _, ok = (event2.eventData).(*insertEventData) + assert.False(t, ok) + assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) + assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) +} + +func TestDDLBinlog1(t *testing.T) { + w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50) + assert.Nil(t, err) + + e1, err := w.NextCreateCollectionEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + e2, err := w.NextDropCollectionEventWriter() + assert.Nil(t, err) + err = e2.AddDataToPayload([]int64{7, 8, 9}) + assert.Nil(t, err) + err = e2.AddDataToPayload([]bool{true, false, true}) + assert.NotNil(t, err) + err = e2.AddDataToPayload([]int64{10, 11, 12}) + assert.Nil(t, err) + e2.SetStartTimestamp(300) + e2.SetEndTimestamp(400) + + w.SetStartTimeStamp(1000) + w.SetEndTimeStamp(2000) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + buf, err := w.GetBuffer() + assert.Nil(t, err) + + //magic number + magicNum := UnsafeReadInt32(buf, 0) + assert.Equal(t, magicNum, MagicNumber) + pos := int(unsafe.Sizeof(MagicNumber)) + + //descriptor header, timestamp + ts := UnsafeReadInt64(buf, pos) + assert.Greater(t, ts, int64(0)) + curts := time.Now().UnixNano() / int64(time.Millisecond) + curts = int64(tsoutil.ComposeTS(curts, 0)) + diffts := curts - ts + maxdiff := int64(tsoutil.ComposeTS(1000, 0)) + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(ts)) + + //descriptor header, type code + tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(tc), DescriptorEventType) + pos += int(unsafe.Sizeof(tc)) + + //descriptor header, server id + svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(svrID)) + + //descriptor header, event length + descEventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(descEventLen)) + + //descriptor header, next position + descNxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //descriptor data fix, binlog version + binLogVer := UnsafeReadInt16(buf, pos) + assert.Equal(t, binLogVer, int16(BinlogVersion)) + pos += int(unsafe.Sizeof(binLogVer)) + + //descriptor data fix, server version + svrVer := UnsafeReadInt64(buf, pos) + assert.Equal(t, svrVer, int64(ServerVersion)) + pos += int(unsafe.Sizeof(svrVer)) + + //descriptor data fix, commit id + cmitID := UnsafeReadInt64(buf, pos) + assert.Equal(t, cmitID, int64(CommitID)) + pos += int(unsafe.Sizeof(cmitID)) + + //descriptor data fix, header length + headLen := UnsafeReadInt8(buf, pos) + assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) + pos += int(unsafe.Sizeof(headLen)) + + //descriptor data fix, collection id + collID := UnsafeReadInt64(buf, pos) + assert.Equal(t, collID, int64(50)) + pos += int(unsafe.Sizeof(collID)) + + //descriptor data fix, partition id + partID := UnsafeReadInt64(buf, pos) + assert.Equal(t, partID, int64(-1)) + pos += int(unsafe.Sizeof(partID)) + + //descriptor data fix, segment id + segID := UnsafeReadInt64(buf, pos) + assert.Equal(t, segID, int64(-1)) + pos += int(unsafe.Sizeof(segID)) + + //descriptor data fix, field id + fieldID := UnsafeReadInt64(buf, pos) + assert.Equal(t, fieldID, int64(-1)) + pos += int(unsafe.Sizeof(fieldID)) + + //descriptor data fix, start time stamp + startts := UnsafeReadInt64(buf, pos) + assert.Equal(t, startts, int64(1000)) + pos += int(unsafe.Sizeof(startts)) + + //descriptor data fix, end time stamp + endts := UnsafeReadInt64(buf, pos) + assert.Equal(t, endts, int64(2000)) + pos += int(unsafe.Sizeof(endts)) + + //descriptor data fix, payload type + colType := UnsafeReadInt32(buf, pos) + assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) + pos += int(unsafe.Sizeof(colType)) + + //descriptor data, post header lengths + for i := DescriptorEventType; i < EventTypeEnd; i++ { + size := getEventFixPartSize(i) + assert.Equal(t, uint8(size), buf[pos]) + pos++ + } + + //start of e1 + assert.Equal(t, pos, int(descNxtPos)) + + //insert e1 header, Timestamp + e1ts := UnsafeReadInt64(buf, pos) + diffts = curts - e1ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e1ts)) + + //insert e1 header, type code + e1tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType) + pos += int(unsafe.Sizeof(e1tc)) + + //insert e1 header, Server id + e1svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e1svrID)) + + //insert e1 header, event length + e1EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e1EventLen)) + + //insert e1 header, next position + e1NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e1 data, start time stamp + e1st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1st, int64(100)) + pos += int(unsafe.Sizeof(e1st)) + + //insert e1 data, end time stamp + e1et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1et, int64(200)) + pos += int(unsafe.Sizeof(e1et)) + + //insert e1, payload + e1Payload := buf[pos:e1NxtPos] + e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) + assert.Nil(t, err) + e1a, err := e1r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) + err = e1r.Close() + assert.Nil(t, err) + + //start of e2 + pos = int(e1NxtPos) + + //insert e2 header, Timestamp + e2ts := UnsafeReadInt64(buf, pos) + diffts = curts - e2ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e2ts)) + + //insert e2 header, type code + e2tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType) + pos += int(unsafe.Sizeof(e2tc)) + + //insert e2 header, Server id + e2svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e2svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e2svrID)) + + //insert e2 header, event length + e2EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e2EventLen)) + + //insert e2 header, next position + e2NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e2 data, start time stamp + e2st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2st, int64(300)) + pos += int(unsafe.Sizeof(e2st)) + + //insert e2 data, end time stamp + e2et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2et, int64(400)) + pos += int(unsafe.Sizeof(e2et)) + + //insert e2, payload + e2Payload := buf[pos:] + e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) + assert.Nil(t, err) + e2a, err := e2r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) + err = e2r.Close() + assert.Nil(t, err) + + assert.Equal(t, int(e2NxtPos), len(buf)) + + //read binlog + r, err := NewBinlogReader(buf) + assert.Nil(t, err) + event1, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event1) + p1, err := event1.GetInt64FromPayload() + assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) + assert.Nil(t, err) + assert.Equal(t, event1.TypeCode, CreateCollectionEventType) + ed1, ok := (event1.eventData).(*createCollectionEventData) + assert.True(t, ok) + assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) + assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) + + event2, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event2) + p2, err := event2.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) + assert.Equal(t, event2.TypeCode, DropCollectionEventType) + ed2, ok := (event2.eventData).(*dropCollectionEventData) + assert.True(t, ok) + _, ok = (event2.eventData).(*insertEventData) + assert.False(t, ok) + assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) + assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) +} + +func TestDDLBinlog2(t *testing.T) { + w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50) + assert.Nil(t, err) + + e1, err := w.NextCreatePartitionEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + e2, err := w.NextDropPartitionEventWriter() + assert.Nil(t, err) + err = e2.AddDataToPayload([]int64{7, 8, 9}) + assert.Nil(t, err) + err = e2.AddDataToPayload([]bool{true, false, true}) + assert.NotNil(t, err) + err = e2.AddDataToPayload([]int64{10, 11, 12}) + assert.Nil(t, err) + e2.SetStartTimestamp(300) + e2.SetEndTimestamp(400) + + w.SetStartTimeStamp(1000) + w.SetEndTimeStamp(2000) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + buf, err := w.GetBuffer() + assert.Nil(t, err) + + //magic number + magicNum := UnsafeReadInt32(buf, 0) + assert.Equal(t, magicNum, MagicNumber) + pos := int(unsafe.Sizeof(MagicNumber)) + + //descriptor header, timestamp + ts := UnsafeReadInt64(buf, pos) + assert.Greater(t, ts, int64(0)) + curts := time.Now().UnixNano() / int64(time.Millisecond) + curts = int64(tsoutil.ComposeTS(curts, 0)) + diffts := curts - ts + maxdiff := int64(tsoutil.ComposeTS(1000, 0)) + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(ts)) + + //descriptor header, type code + tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(tc), DescriptorEventType) + pos += int(unsafe.Sizeof(tc)) + + //descriptor header, server id + svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(svrID)) + + //descriptor header, event length + descEventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(descEventLen)) + + //descriptor header, next position + descNxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //descriptor data fix, binlog version + binLogVer := UnsafeReadInt16(buf, pos) + assert.Equal(t, binLogVer, int16(BinlogVersion)) + pos += int(unsafe.Sizeof(binLogVer)) + + //descriptor data fix, server version + svrVer := UnsafeReadInt64(buf, pos) + assert.Equal(t, svrVer, int64(ServerVersion)) + pos += int(unsafe.Sizeof(svrVer)) + + //descriptor data fix, commit id + cmitID := UnsafeReadInt64(buf, pos) + assert.Equal(t, cmitID, int64(CommitID)) + pos += int(unsafe.Sizeof(cmitID)) + + //descriptor data fix, header length + headLen := UnsafeReadInt8(buf, pos) + assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) + pos += int(unsafe.Sizeof(headLen)) + + //descriptor data fix, collection id + collID := UnsafeReadInt64(buf, pos) + assert.Equal(t, collID, int64(50)) + pos += int(unsafe.Sizeof(collID)) + + //descriptor data fix, partition id + partID := UnsafeReadInt64(buf, pos) + assert.Equal(t, partID, int64(-1)) + pos += int(unsafe.Sizeof(partID)) + + //descriptor data fix, segment id + segID := UnsafeReadInt64(buf, pos) + assert.Equal(t, segID, int64(-1)) + pos += int(unsafe.Sizeof(segID)) + + //descriptor data fix, field id + fieldID := UnsafeReadInt64(buf, pos) + assert.Equal(t, fieldID, int64(-1)) + pos += int(unsafe.Sizeof(fieldID)) + + //descriptor data fix, start time stamp + startts := UnsafeReadInt64(buf, pos) + assert.Equal(t, startts, int64(1000)) + pos += int(unsafe.Sizeof(startts)) + + //descriptor data fix, end time stamp + endts := UnsafeReadInt64(buf, pos) + assert.Equal(t, endts, int64(2000)) + pos += int(unsafe.Sizeof(endts)) + + //descriptor data fix, payload type + colType := UnsafeReadInt32(buf, pos) + assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) + pos += int(unsafe.Sizeof(colType)) + + //descriptor data, post header lengths + for i := DescriptorEventType; i < EventTypeEnd; i++ { + size := getEventFixPartSize(i) + assert.Equal(t, uint8(size), buf[pos]) + pos++ + } + + //start of e1 + assert.Equal(t, pos, int(descNxtPos)) + + //insert e1 header, Timestamp + e1ts := UnsafeReadInt64(buf, pos) + diffts = curts - e1ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e1ts)) + + //insert e1 header, type code + e1tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType) + pos += int(unsafe.Sizeof(e1tc)) + + //insert e1 header, Server id + e1svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e1svrID)) + + //insert e1 header, event length + e1EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e1EventLen)) + + //insert e1 header, next position + e1NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e1 data, start time stamp + e1st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1st, int64(100)) + pos += int(unsafe.Sizeof(e1st)) + + //insert e1 data, end time stamp + e1et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e1et, int64(200)) + pos += int(unsafe.Sizeof(e1et)) + + //insert e1, payload + e1Payload := buf[pos:e1NxtPos] + e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) + assert.Nil(t, err) + e1a, err := e1r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) + err = e1r.Close() + assert.Nil(t, err) + + //start of e2 + pos = int(e1NxtPos) + + //insert e2 header, Timestamp + e2ts := UnsafeReadInt64(buf, pos) + diffts = curts - e2ts + assert.LessOrEqual(t, diffts, maxdiff) + pos += int(unsafe.Sizeof(e2ts)) + + //insert e2 header, type code + e2tc := UnsafeReadInt8(buf, pos) + assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType) + pos += int(unsafe.Sizeof(e2tc)) + + //insert e2 header, Server id + e2svrID := UnsafeReadInt32(buf, pos) + assert.Equal(t, e2svrID, int32(ServerID)) + pos += int(unsafe.Sizeof(e2svrID)) + + //insert e2 header, event length + e2EventLen := UnsafeReadInt32(buf, pos) + pos += int(unsafe.Sizeof(e2EventLen)) + + //insert e2 header, next position + e2NxtPos := UnsafeReadInt32(buf, pos) + assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) + pos += int(unsafe.Sizeof(descNxtPos)) + + //insert e2 data, start time stamp + e2st := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2st, int64(300)) + pos += int(unsafe.Sizeof(e2st)) + + //insert e2 data, end time stamp + e2et := UnsafeReadInt64(buf, pos) + assert.Equal(t, e2et, int64(400)) + pos += int(unsafe.Sizeof(e2et)) + + //insert e2, payload + e2Payload := buf[pos:] + e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) + assert.Nil(t, err) + e2a, err := e2r.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) + err = e2r.Close() + assert.Nil(t, err) + + assert.Equal(t, int(e2NxtPos), len(buf)) + + //read binlog + r, err := NewBinlogReader(buf) + assert.Nil(t, err) + event1, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event1) + p1, err := event1.GetInt64FromPayload() + assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) + assert.Nil(t, err) + assert.Equal(t, event1.TypeCode, CreatePartitionEventType) + ed1, ok := (event1.eventData).(*createPartitionEventData) + assert.True(t, ok) + assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) + assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) + + event2, err := r.NextEventReader() + assert.Nil(t, err) + assert.NotNil(t, event2) + p2, err := event2.GetInt64FromPayload() + assert.Nil(t, err) + assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) + assert.Equal(t, event2.TypeCode, DropPartitionEventType) + ed2, ok := (event2.eventData).(*dropPartitionEventData) + assert.True(t, ok) + _, ok = (event2.eventData).(*insertEventData) + assert.False(t, ok) + assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) + assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) } diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index a84faed97d..3cd12c2426 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -223,12 +223,13 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID }, }, nil } -func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) { +func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DeleteBinlogWriter, error) { descriptorEvent, err := newDescriptorEvent() if err != nil { return nil, err } descriptorEvent.PayloadDataType = dataType + descriptorEvent.CollectionID = collectionID return &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, @@ -239,12 +240,13 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, err }, }, nil } -func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) { +func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBinlogWriter, error) { descriptorEvent, err := newDescriptorEvent() if err != nil { return nil, err } descriptorEvent.PayloadDataType = dataType + descriptorEvent.CollectionID = collectionID return &DDLBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index fd313b0df7..f88bd6bc33 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -54,6 +54,5 @@ func TestBinlogWriterReader(t *testing.T) { reader, err := binlogReader.NextEventReader() assert.Nil(t, err) - fmt.Println("reader offset : " + strconv.Itoa(int(binlogReader.currentOffset))) assert.Nil(t, reader) } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 942f48b5a1..0537646d12 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -354,7 +354,7 @@ type DataDefinitionCodec struct { } func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING) + writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID) if err != nil { return nil, err } @@ -426,7 +426,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ value: buffer, }) - writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64) + writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID) if err != nil { return nil, err }