From af173dd2a0779d6f5fa39bf6fda5646249bdb823 Mon Sep 17 00:00:00 2001 From: godchen Date: Tue, 28 Sep 2021 14:30:02 +0800 Subject: [PATCH] Add delete codec (#8736) Signed-off-by: godchen --- internal/storage/binlog_test.go | 8 +-- internal/storage/binlog_writer.go | 4 +- internal/storage/data_codec.go | 94 +++++++++++++++++++++++++++++ internal/storage/data_codec_test.go | 19 ++++++ 4 files changed, 120 insertions(+), 5 deletions(-) diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index ac77d65e78..c5529b2a18 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -263,7 +263,7 @@ func TestInsertBinlog(t *testing.T) { /* #nosec G103 */ func TestDeleteBinlog(t *testing.T) { - w := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50) + w := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50, 1, 1) e1, err := w.NextDeleteEventWriter() assert.Nil(t, err) @@ -332,12 +332,12 @@ func TestDeleteBinlog(t *testing.T) { //descriptor data fix, partition id partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) + assert.Equal(t, partID, int64(1)) pos += int(unsafe.Sizeof(partID)) //descriptor data fix, segment id segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) + assert.Equal(t, segID, int64(1)) pos += int(unsafe.Sizeof(segID)) //descriptor data fix, field id @@ -1075,7 +1075,7 @@ func TestInsertBinlogWriterCloseError(t *testing.T) { } func TestDeleteBinlogWriteCloseError(t *testing.T) { - deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10) + deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10, 1, 1) e1, err := deleteWriter.NextDeleteEventWriter() assert.Nil(t, err) err = e1.AddDataToPayload([]int64{1, 2, 3}) diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 5afd7db79c..fb14bbc217 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -239,10 +239,12 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID } // NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file. -func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter { +func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter { descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType descriptorEvent.CollectionID = collectionID + descriptorEvent.PartitionID = partitionID + descriptorEvent.SegmentID = segmentID return &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 506924e9f6..1a3e40a7cc 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -14,6 +14,7 @@ package storage import ( "encoding/json" "fmt" + "math" "sort" "strconv" "strings" @@ -487,6 +488,99 @@ func (insertCodec *InsertCodec) Close() error { return nil } +// DeleteData saves each entity delete message represented as map. +// timestamp represents the time when this instance was deleted +type DeleteData struct { + Data map[string]int64 // primary key to timestamp +} + +type DeleteCodec struct { + Schema *etcdpb.CollectionMeta + readerCloseFunc []func() error +} + +func NewDeleteCodec(schema *etcdpb.CollectionMeta) *DeleteCodec { + return &DeleteCodec{Schema: schema} +} + +// Serialize transfer delete data to blob. . +// For each delete message, it will save "pk,ts" string to binlog. +func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error) { + binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, deleteCodec.Schema.ID, partitionID, segmentID) + eventWriter, err := binlogWriter.NextDeleteEventWriter() + if err != nil { + return nil, err + } + startTs, endTs := math.MaxInt64, math.MinInt64 + for key, value := range data.Data { + if value < int64(startTs) { + startTs = int(value) + } + if value > int64(endTs) { + endTs = int(value) + } + err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%s,%d", key, value)) + if err != nil { + return nil, err + } + } + eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs)) + binlogWriter.SetEventTimeStamp(uint64(startTs), uint64(endTs)) + err = binlogWriter.Close() + if err != nil { + return nil, err + } + buffer, err := binlogWriter.GetBuffer() + if err != nil { + return nil, err + } + blob := &Blob{ + Value: buffer, + } + return blob, nil + +} + +func (deleteCodec *DeleteCodec) Deserialize(blob *Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) { + if blob == nil { + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") + } + readerClose := func(reader *BinlogReader) func() error { + return func() error { return reader.Close() } + } + binlogReader, err := NewBinlogReader(blob.Value) + if err != nil { + return InvalidUniqueID, InvalidUniqueID, nil, err + } + pid, sid := binlogReader.PartitionID, binlogReader.SegmentID + eventReader, err := binlogReader.NextEventReader() + if err != nil { + return InvalidUniqueID, InvalidUniqueID, nil, err + } + result := &DeleteData{ + Data: make(map[string]int64), + } + length, err := eventReader.GetPayloadLengthFromReader() + for i := 0; i < length; i++ { + singleString, err := eventReader.GetOneStringFromPayload(i) + if err != nil { + return InvalidUniqueID, InvalidUniqueID, nil, err + } + splits := strings.Split(singleString, ",") + if len(splits) != 2 { + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect") + } + ts, err := strconv.ParseInt(splits[1], 10, 64) + if err != nil { + return -1, -1, nil, err + } + result.Data[splits[0]] = ts + } + deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader)) + return pid, sid, result, nil + +} + // Blob key example: // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} // ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 57842c8ea1..249aa39e0b 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -297,6 +297,25 @@ func TestInsertCodec(t *testing.T) { _, _, _, err = insertCodec.Deserialize(blobs) assert.NotNil(t, err) } + +func TestDeleteCodec(t *testing.T) { + schema := &etcdpb.CollectionMeta{ + ID: CollectionID, + } + deleteCodec := NewDeleteCodec(schema) + deleteData := &DeleteData{ + Data: map[string]int64{"1": 43757345, "2": 23578294723}, + } + blob, err := deleteCodec.Serialize(1, 1, deleteData) + assert.Nil(t, err) + + pid, sid, data, err := deleteCodec.Deserialize(blob) + assert.Nil(t, err) + assert.Equal(t, pid, int64(1)) + assert.Equal(t, sid, int64(1)) + assert.Equal(t, data, deleteData) +} + func TestDDCodec(t *testing.T) { dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) ts := []Timestamp{1, 2, 3, 4}