diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index 8efe8a6daf..e6f58a8db6 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -538,507 +538,6 @@ func TestDeleteBinlog(t *testing.T) { assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) } -/* #nosec G103 */ -func TestDDLBinlog1(t *testing.T) { - w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) - - e1, err := w.NextCreateCollectionEventWriter() - assert.NoError(t, err) - err = e1.AddDataToPayload([]int64{1, 2, 3}, nil) - assert.NoError(t, err) - err = e1.AddDataToPayload([]int32{4, 5, 6}, nil) - assert.Error(t, err) - err = e1.AddDataToPayload([]int64{4, 5, 6}, nil) - assert.NoError(t, err) - e1.SetEventTimestamp(100, 200) - - e2, err := w.NextDropCollectionEventWriter() - assert.NoError(t, err) - err = e2.AddDataToPayload([]int64{7, 8, 9}, nil) - assert.NoError(t, err) - err = e2.AddDataToPayload([]bool{true, false, true}, nil) - assert.Error(t, err) - err = e2.AddDataToPayload([]int64{10, 11, 12}, nil) - assert.NoError(t, err) - e2.SetEventTimestamp(300, 400) - - w.SetEventTimeStamp(1000, 2000) - - w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra") - sizeTotal := 2000000 - w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - - _, err = w.GetBuffer() - assert.Error(t, err) - err = w.Finish() - assert.NoError(t, err) - buf, err := w.GetBuffer() - assert.NoError(t, err) - - w.Close() - - // magic number - magicNum := UnsafeReadInt32(buf, 0) - assert.Equal(t, magicNum, MagicNumber) - pos := int(unsafe.Sizeof(MagicNumber)) - - // descriptor header, timestamp - ts := UnsafeReadInt64(buf, pos) - assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - diffts := curts - ts - maxdiff := int64(tsoutil.ComposeTS(1000, 0)) - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(ts)) - - // descriptor header, type code - tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(tc), DescriptorEventType) - pos += int(unsafe.Sizeof(tc)) - - // descriptor header, event length - descEventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(descEventLen)) - - // descriptor header, next position - descNxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // descriptor data fix, collection id - collID := UnsafeReadInt64(buf, pos) - assert.Equal(t, collID, int64(50)) - pos += int(unsafe.Sizeof(collID)) - - // descriptor data fix, partition id - partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) - pos += int(unsafe.Sizeof(partID)) - - // descriptor data fix, segment id - segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) - pos += int(unsafe.Sizeof(segID)) - - // descriptor data fix, field id - fieldID := UnsafeReadInt64(buf, pos) - assert.Equal(t, fieldID, int64(-1)) - pos += int(unsafe.Sizeof(fieldID)) - - // descriptor data fix, start time stamp - startts := UnsafeReadInt64(buf, pos) - assert.Equal(t, startts, int64(1000)) - pos += int(unsafe.Sizeof(startts)) - - // descriptor data fix, end time stamp - endts := UnsafeReadInt64(buf, pos) - assert.Equal(t, endts, int64(2000)) - pos += int(unsafe.Sizeof(endts)) - - // descriptor data fix, payload type - colType := UnsafeReadInt32(buf, pos) - assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_Int64) - pos += int(unsafe.Sizeof(colType)) - - // descriptor data, post header lengths - for i := DescriptorEventType; i < EventTypeEnd; i++ { - size := getEventFixPartSize(i) - assert.Equal(t, uint8(size), buf[pos]) - pos++ - } - - // descriptor data, extra length - extraLength := UnsafeReadInt32(buf, pos) - assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength) - pos += int(unsafe.Sizeof(extraLength)) - - multiBytes := make([]byte, extraLength) - for i := 0; i < int(extraLength); i++ { - singleByte := UnsafeReadByte(buf, pos) - multiBytes[i] = singleByte - pos++ - } - var extra map[string]interface{} - err = json.Unmarshal(multiBytes, &extra) - assert.NoError(t, err) - testExtra, ok := extra["test"] - assert.True(t, ok) - assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra)) - size, ok := extra[originalSizeKey] - assert.True(t, ok) - assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size)) - - // start of e1 - assert.Equal(t, pos, int(descNxtPos)) - - // insert e1 header, Timestamp - e1ts := UnsafeReadInt64(buf, pos) - diffts = curts - e1ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e1ts)) - - // insert e1 header, type code - e1tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType) - pos += int(unsafe.Sizeof(e1tc)) - - // insert e1 header, event length - e1EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e1EventLen)) - - // insert e1 header, next position - e1NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // insert e1 data, start time stamp - e1st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1st, int64(100)) - pos += int(unsafe.Sizeof(e1st)) - - // insert e1 data, end time stamp - e1et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1et, int64(200)) - pos += int(unsafe.Sizeof(e1et)) - - // insert e1, payload - e1Payload := buf[pos:e1NxtPos] - e1r, err := NewPayloadReader(schemapb.DataType_Int64, e1Payload, false) - assert.NoError(t, err) - e1a, valids, err := e1r.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) - e1r.Close() - - // start of e2 - pos = int(e1NxtPos) - - // insert e2 header, Timestamp - e2ts := UnsafeReadInt64(buf, pos) - diffts = curts - e2ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e2ts)) - - // insert e2 header, type code - e2tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType) - pos += int(unsafe.Sizeof(e2tc)) - - // insert e2 header, event length - e2EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e2EventLen)) - - // insert e2 header, next position - e2NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // insert e2 data, start time stamp - e2st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2st, int64(300)) - pos += int(unsafe.Sizeof(e2st)) - - // insert e2 data, end time stamp - e2et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2et, int64(400)) - pos += int(unsafe.Sizeof(e2et)) - - // insert e2, payload - e2Payload := buf[pos:] - e2r, err := NewPayloadReader(schemapb.DataType_Int64, e2Payload, false) - assert.NoError(t, err) - e2a, valids, err := e2r.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) - e2r.Close() - - assert.Equal(t, int(e2NxtPos), len(buf)) - - // read binlog - r, err := NewBinlogReader(buf) - assert.NoError(t, err) - event1, err := r.NextEventReader() - assert.NoError(t, err) - assert.NotNil(t, event1) - p1, valids, err := event1.GetInt64FromPayload() - assert.Nil(t, valids) - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.NoError(t, err) - assert.Equal(t, event1.TypeCode, CreateCollectionEventType) - ed1, ok := (event1.eventData).(*createCollectionEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.NoError(t, err) - assert.NotNil(t, event2) - p2, valids, err := event2.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, DropCollectionEventType) - ed2, ok := (event2.eventData).(*dropCollectionEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*insertEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) -} - -/* #nosec G103 */ -func TestDDLBinlog2(t *testing.T) { - w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) - - e1, err := w.NextCreatePartitionEventWriter() - assert.NoError(t, err) - err = e1.AddDataToPayload([]int64{1, 2, 3}, nil) - assert.NoError(t, err) - err = e1.AddDataToPayload([]int32{4, 5, 6}, nil) - assert.Error(t, err) - err = e1.AddDataToPayload([]int64{4, 5, 6}, nil) - assert.NoError(t, err) - e1.SetEventTimestamp(100, 200) - - e2, err := w.NextDropPartitionEventWriter() - assert.NoError(t, err) - err = e2.AddDataToPayload([]int64{7, 8, 9}, nil) - assert.NoError(t, err) - err = e2.AddDataToPayload([]bool{true, false, true}, nil) - assert.Error(t, err) - err = e2.AddDataToPayload([]int64{10, 11, 12}, nil) - assert.NoError(t, err) - e2.SetEventTimestamp(300, 400) - - w.SetEventTimeStamp(1000, 2000) - - w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra") - sizeTotal := 2000000 - w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - - _, err = w.GetBuffer() - assert.Error(t, err) - err = w.Finish() - assert.NoError(t, err) - buf, err := w.GetBuffer() - assert.NoError(t, err) - w.Close() - - // magic number - magicNum := UnsafeReadInt32(buf, 0) - assert.Equal(t, magicNum, MagicNumber) - pos := int(unsafe.Sizeof(MagicNumber)) - - // descriptor header, timestamp - ts := UnsafeReadInt64(buf, pos) - assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - diffts := curts - ts - maxdiff := int64(tsoutil.ComposeTS(1000, 0)) - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(ts)) - - // descriptor header, type code - tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(tc), DescriptorEventType) - pos += int(unsafe.Sizeof(tc)) - - // descriptor header, event length - descEventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(descEventLen)) - - // descriptor header, next position - descNxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // descriptor data fix, collection id - collID := UnsafeReadInt64(buf, pos) - assert.Equal(t, collID, int64(50)) - pos += int(unsafe.Sizeof(collID)) - - // descriptor data fix, partition id - partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) - pos += int(unsafe.Sizeof(partID)) - - // descriptor data fix, segment id - segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) - pos += int(unsafe.Sizeof(segID)) - - // descriptor data fix, field id - fieldID := UnsafeReadInt64(buf, pos) - assert.Equal(t, fieldID, int64(-1)) - pos += int(unsafe.Sizeof(fieldID)) - - // descriptor data fix, start time stamp - startts := UnsafeReadInt64(buf, pos) - assert.Equal(t, startts, int64(1000)) - pos += int(unsafe.Sizeof(startts)) - - // descriptor data fix, end time stamp - endts := UnsafeReadInt64(buf, pos) - assert.Equal(t, endts, int64(2000)) - pos += int(unsafe.Sizeof(endts)) - - // descriptor data fix, payload type - colType := UnsafeReadInt32(buf, pos) - assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_Int64) - pos += int(unsafe.Sizeof(colType)) - - // descriptor data, post header lengths - for i := DescriptorEventType; i < EventTypeEnd; i++ { - size := getEventFixPartSize(i) - assert.Equal(t, uint8(size), buf[pos]) - pos++ - } - - // descriptor data, extra length - extraLength := UnsafeReadInt32(buf, pos) - assert.Equal(t, extraLength, w.baseBinlogWriter.descriptorEventData.ExtraLength) - pos += int(unsafe.Sizeof(extraLength)) - - multiBytes := make([]byte, extraLength) - for i := 0; i < int(extraLength); i++ { - singleByte := UnsafeReadByte(buf, pos) - multiBytes[i] = singleByte - pos++ - } - var extra map[string]interface{} - err = json.Unmarshal(multiBytes, &extra) - assert.NoError(t, err) - testExtra, ok := extra["test"] - assert.True(t, ok) - assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra)) - size, ok := extra[originalSizeKey] - assert.True(t, ok) - assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size)) - - // start of e1 - assert.Equal(t, pos, int(descNxtPos)) - - // insert e1 header, Timestamp - e1ts := UnsafeReadInt64(buf, pos) - diffts = curts - e1ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e1ts)) - - // insert e1 header, type code - e1tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType) - pos += int(unsafe.Sizeof(e1tc)) - - // insert e1 header, event length - e1EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e1EventLen)) - - // insert e1 header, next position - e1NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // insert e1 data, start time stamp - e1st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1st, int64(100)) - pos += int(unsafe.Sizeof(e1st)) - - // insert e1 data, end time stamp - e1et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1et, int64(200)) - pos += int(unsafe.Sizeof(e1et)) - - // insert e1, payload - e1Payload := buf[pos:e1NxtPos] - e1r, err := NewPayloadReader(schemapb.DataType_Int64, e1Payload, false) - assert.NoError(t, err) - e1a, valids, err := e1r.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) - e1r.Close() - - // start of e2 - pos = int(e1NxtPos) - - // insert e2 header, Timestamp - e2ts := UnsafeReadInt64(buf, pos) - diffts = curts - e2ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e2ts)) - - // insert e2 header, type code - e2tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType) - pos += int(unsafe.Sizeof(e2tc)) - - // insert e2 header, event length - e2EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e2EventLen)) - - // insert e2 header, next position - e2NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - // insert e2 data, start time stamp - e2st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2st, int64(300)) - pos += int(unsafe.Sizeof(e2st)) - - // insert e2 data, end time stamp - e2et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2et, int64(400)) - pos += int(unsafe.Sizeof(e2et)) - - // insert e2, payload - e2Payload := buf[pos:] - e2r, err := NewPayloadReader(schemapb.DataType_Int64, e2Payload, false) - assert.NoError(t, err) - e2a, valids, err := e2r.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) - e2r.Close() - - assert.Equal(t, int(e2NxtPos), len(buf)) - - // read binlog - r, err := NewBinlogReader(buf) - assert.NoError(t, err) - event1, err := r.NextEventReader() - assert.NoError(t, err) - assert.NotNil(t, event1) - p1, valids, err := event1.GetInt64FromPayload() - assert.Nil(t, valids) - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.NoError(t, err) - assert.Equal(t, event1.TypeCode, CreatePartitionEventType) - ed1, ok := (event1.eventData).(*createPartitionEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.NoError(t, err) - assert.NotNil(t, event2) - p2, valids, err := event2.GetInt64FromPayload() - assert.Nil(t, valids) - assert.NoError(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, DropPartitionEventType) - ed2, ok := (event2.eventData).(*dropPartitionEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*insertEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) -} - /* #nosec G103 */ func TestIndexFileBinlog(t *testing.T) { indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) @@ -1432,42 +931,6 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) { deleteWriter.Close() } -func TestDDBinlogWriteCloseError(t *testing.T) { - ddBinlogWriter := NewDDLBinlogWriter(schemapb.DataType_Int64, 10) - e1, err := ddBinlogWriter.NextCreateCollectionEventWriter() - assert.NoError(t, err) - - sizeTotal := 2000000 - ddBinlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - - err = e1.AddDataToPayload([]int64{1, 2, 3}, nil) - assert.NoError(t, err) - e1.SetEventTimestamp(100, 200) - - ddBinlogWriter.SetEventTimeStamp(1000, 2000) - err = ddBinlogWriter.Finish() - assert.NoError(t, err) - assert.NotNil(t, ddBinlogWriter.buffer) - - createCollectionEventWriter, err := ddBinlogWriter.NextCreateCollectionEventWriter() - assert.Nil(t, createCollectionEventWriter) - assert.Error(t, err) - - dropCollectionEventWriter, err := ddBinlogWriter.NextDropCollectionEventWriter() - assert.Nil(t, dropCollectionEventWriter) - assert.Error(t, err) - - createPartitionEventWriter, err := ddBinlogWriter.NextCreatePartitionEventWriter() - assert.Nil(t, createPartitionEventWriter) - assert.Error(t, err) - - dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter() - assert.Nil(t, dropPartitionEventWriter) - assert.Error(t, err) - - ddBinlogWriter.Close() -} - type testEvent struct { PayloadWriterInterface finishError bool diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 47f30e242f..2d94dba019 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -184,67 +184,6 @@ func (writer *DeleteBinlogWriter) NextDeleteEventWriter(opts ...PayloadWriterOpt return event, nil } -// DDLBinlogWriter is an object to write binlog file which saves ddl information. -type DDLBinlogWriter struct { - baseBinlogWriter -} - -// NextCreateCollectionEventWriter returns an event writer to write CreateCollection -// information to an event. -func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) { - if writer.isClosed() { - return nil, errors.New("binlog has closed") - } - event, err := newCreateCollectionEventWriter(writer.PayloadDataType) - if err != nil { - return nil, err - } - writer.eventWriters = append(writer.eventWriters, event) - return event, nil -} - -// NextDropCollectionEventWriter returns an event writer to write DropCollection -// information to an event. -func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) { - if writer.isClosed() { - return nil, errors.New("binlog has closed") - } - event, err := newDropCollectionEventWriter(writer.PayloadDataType) - if err != nil { - return nil, err - } - writer.eventWriters = append(writer.eventWriters, event) - return event, nil -} - -// NextCreatePartitionEventWriter returns an event writer to write CreatePartition -// information to an event. -func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) { - if writer.isClosed() { - return nil, errors.New("binlog has closed") - } - event, err := newCreatePartitionEventWriter(writer.PayloadDataType) - if err != nil { - return nil, err - } - writer.eventWriters = append(writer.eventWriters, event) - return event, nil -} - -// NextDropPartitionEventWriter returns an event writer to write DropPartition -// information to an event. -func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) { - if writer.isClosed() { - return nil, errors.New("binlog has closed") - } - event, err := newDropPartitionEventWriter(writer.PayloadDataType) - if err != nil { - return nil, err - } - writer.eventWriters = append(writer.eventWriters, event) - return event, nil -} - // IndexFileBinlogWriter is an object to write binlog file which saves index files type IndexFileBinlogWriter struct { baseBinlogWriter @@ -305,20 +244,3 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID } return w } - -// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file. -func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter { - descriptorEvent := newDescriptorEvent() - descriptorEvent.PayloadDataType = dataType - descriptorEvent.CollectionID = collectionID - w := &DDLBinlogWriter{ - baseBinlogWriter: baseBinlogWriter{ - descriptorEvent: *descriptorEvent, - magicNumber: MagicNumber, - binlogType: DDLBinlog, - eventWriters: make([]EventWriter, 0), - buffer: nil, - }, - } - return w -} diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index b77f45cd37..d09723ec35 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -863,184 +863,3 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return pid, sid, result, nil } - -// DataDefinitionCodec serializes and deserializes the data definition -// Blob key example: -// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} -// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} -type DataDefinitionCodec struct { - collectionID int64 -} - -// NewDataDefinitionCodec is constructor for DataDefinitionCodec -func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec { - return &DataDefinitionCodec{collectionID: collectionID} -} - -// Serialize transfer @ts and @ddRequsts to blob. -// From schema, it get all fields. -// For each field, it will create a binlog writer, and write specific event according -// to the dataDefinition type. -// It returns blobs in the end. -func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer := NewDDLBinlogWriter(schemapb.DataType_Int64, dataDefinitionCodec.collectionID) - eventWriter, err := writer.NextCreateCollectionEventWriter() - if err != nil { - writer.Close() - return nil, err - } - defer writer.Close() - defer eventWriter.Close() - - var blobs []*Blob - - var int64Ts []int64 - for _, singleTs := range ts { - int64Ts = append(int64Ts, int64(singleTs)) - } - err = eventWriter.AddInt64ToPayload(int64Ts, nil) - if err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(ts[0], ts[len(ts)-1]) - writer.SetEventTimeStamp(ts[0], ts[len(ts)-1]) - - // https://github.com/milvus-io/milvus/issues/9620 - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts))) - - err = writer.Finish() - if err != nil { - return nil, err - } - buffer, err := writer.GetBuffer() - if err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: Ts, - Value: buffer, - }) - eventWriter.Close() - writer.Close() - - writer = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID) - - sizeTotal := 0 - for pos, req := range ddRequests { - sizeTotal += len(req) - switch eventTypes[pos] { - case CreateCollectionEventType: - eventWriter, err := writer.NextCreateCollectionEventWriter() - if err != nil { - return nil, err - } - if err = eventWriter.AddOneStringToPayload(req, true); err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(ts[pos], ts[pos]) - case DropCollectionEventType: - eventWriter, err := writer.NextDropCollectionEventWriter() - if err != nil { - return nil, err - } - if err = eventWriter.AddOneStringToPayload(req, true); err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(ts[pos], ts[pos]) - case CreatePartitionEventType: - eventWriter, err := writer.NextCreatePartitionEventWriter() - if err != nil { - return nil, err - } - if err = eventWriter.AddOneStringToPayload(req, true); err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(ts[pos], ts[pos]) - case DropPartitionEventType: - eventWriter, err := writer.NextDropPartitionEventWriter() - if err != nil { - return nil, err - } - if err = eventWriter.AddOneStringToPayload(req, true); err != nil { - return nil, err - } - eventWriter.SetEventTimestamp(ts[pos], ts[pos]) - } - } - writer.SetEventTimeStamp(ts[0], ts[len(ts)-1]) - - // https://github.com/milvus-io/milvus/issues/9620 - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - - if err = writer.Finish(); err != nil { - return nil, err - } - if buffer, err = writer.GetBuffer(); err != nil { - return nil, err - } - blobs = append(blobs, &Blob{ - Key: DDL, - Value: buffer, - }) - - return blobs, nil -} - -// Deserialize transfer blob back to data definition data. -// From schema, it get all fields. -// It will sort blob by blob key for blob logid is increasing by time. -// For each field, it will create a binlog reader, and read all event to the buffer. -// It returns origin @ts and @ddRequests in the end. -func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) { - if len(blobs) == 0 { - return nil, nil, errors.New("blobs is empty") - } - var requestsStrings []string - var resultTs []Timestamp - - var blobList BlobList = blobs - sort.Sort(blobList) - - for _, blob := range blobList { - binlogReader, err := NewBinlogReader(blob.Value) - if err != nil { - return nil, nil, err - } - dataType := binlogReader.PayloadDataType - - for { - eventReader, err := binlogReader.NextEventReader() - if err != nil { - binlogReader.Close() - return nil, nil, err - } - if eventReader == nil { - break - } - switch dataType { - case schemapb.DataType_Int64: - int64Ts, _, err := eventReader.GetInt64FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return nil, nil, err - } - for _, singleTs := range int64Ts { - resultTs = append(resultTs, Timestamp(singleTs)) - } - case schemapb.DataType_String: - stringPayload, _, err := eventReader.GetStringFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return nil, nil, err - } - requestsStrings = append(requestsStrings, stringPayload...) - } - eventReader.Close() - } - binlogReader.Close() - } - - return resultTs, requestsStrings, nil -} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 9819153184..631acf0eb7 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -1168,36 +1168,6 @@ func TestUpgradeDeleteLog(t *testing.T) { }) } -func TestDDCodec(t *testing.T) { - dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) - ts := []Timestamp{1, 2, 3, 4} - ddRequests := []string{ - "CreateCollection", - "DropCollection", - "CreatePartition", - "DropPartition", - } - eventTypeCodes := []EventTypeCode{ - CreateCollectionEventType, - DropCollectionEventType, - CreatePartitionEventType, - DropPartitionEventType, - } - blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes) - assert.NoError(t, err) - for _, blob := range blobs { - blob.Key = fmt.Sprintf("1/data_definition/3/4/5/%d", 99) - } - resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs) - assert.NoError(t, err) - assert.Equal(t, resultTs, ts) - assert.Equal(t, resultRequests, ddRequests) - - blobs = []*Blob{} - _, _, err = dataDefinitionCodec.Deserialize(blobs) - assert.Error(t, err) -} - func TestTsError(t *testing.T) { insertData := &InsertData{} insertCodec := NewInsertCodecWithSchema(nil) diff --git a/internal/storage/print_binlog_test.go b/internal/storage/print_binlog_test.go index a60dcaae71..22f1b2aaf6 100644 --- a/internal/storage/print_binlog_test.go +++ b/internal/storage/print_binlog_test.go @@ -23,10 +23,8 @@ import ( "time" "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" @@ -364,121 +362,6 @@ func TestPrintBinlogFiles(t *testing.T) { } } -func TestPrintDDFiles(t *testing.T) { - dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) - ts := []Timestamp{ - 1, - 2, - 3, - 4, - } - collID := int64(1) - partitionID := int64(1) - collName := "test" - partitionName := "test" - createCollReq := msgpb.CreateCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateCollection, - MsgID: 1, - Timestamp: 1, - SourceID: 1, - }, - CollectionID: collID, - Schema: make([]byte, 0), - CollectionName: collName, - DbName: "DbName", - DbID: UniqueID(0), - } - createCollString, err := proto.Marshal(&createCollReq) - assert.NoError(t, err) - - dropCollReq := msgpb.DropCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropCollection, - MsgID: 2, - Timestamp: 2, - SourceID: 2, - }, - CollectionID: collID, - CollectionName: collName, - DbName: "DbName", - DbID: UniqueID(0), - } - dropCollString, err := proto.Marshal(&dropCollReq) - assert.NoError(t, err) - - createPartitionReq := msgpb.CreatePartitionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreatePartition, - MsgID: 3, - Timestamp: 3, - SourceID: 3, - }, - CollectionID: collID, - PartitionID: partitionID, - CollectionName: collName, - PartitionName: partitionName, - DbName: "DbName", - DbID: UniqueID(0), - } - createPartitionString, err := proto.Marshal(&createPartitionReq) - assert.NoError(t, err) - - dropPartitionReq := msgpb.DropPartitionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropPartition, - MsgID: 4, - Timestamp: 4, - SourceID: 4, - }, - CollectionID: collID, - PartitionID: partitionID, - CollectionName: collName, - PartitionName: partitionName, - DbName: "DbName", - DbID: UniqueID(0), - } - dropPartitionString, err := proto.Marshal(&dropPartitionReq) - assert.NoError(t, err) - ddRequests := []string{ - string(createCollString), - string(dropCollString), - string(createPartitionString), - string(dropPartitionString), - } - eventTypeCodes := []EventTypeCode{ - CreateCollectionEventType, - DropCollectionEventType, - CreatePartitionEventType, - DropPartitionEventType, - } - blobs, err := dataDefinitionCodec.Serialize(ts, ddRequests, eventTypeCodes) - assert.NoError(t, err) - var binlogFiles []string - for index, blob := range blobs { - blob.Key = fmt.Sprintf("1/data_definition/3/4/5/%d", 99) - fileName := fmt.Sprintf("/tmp/ddblob_%d.db", index) - binlogFiles = append(binlogFiles, fileName) - fd, err := os.Create(fileName) - assert.NoError(t, err) - num, err := fd.Write(blob.GetValue()) - assert.NoError(t, err) - assert.Equal(t, num, len(blob.GetValue())) - err = fd.Close() - assert.NoError(t, err) - } - resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs) - assert.NoError(t, err) - assert.Equal(t, resultTs, ts) - assert.Equal(t, resultRequests, ddRequests) - - PrintBinlogFiles(binlogFiles) - - for _, file := range binlogFiles { - _ = os.RemoveAll(file) - } -} - func TestPrintIndexFile(t *testing.T) { indexBuildID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()) version := int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())