Optimize code under storage (#6335)

* rename AddOneStringToPayload/GetOneStringFromPayload to AddStringToPayload/GetStringFromPayload

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename print_binglog_test to print_binlog_test

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update chap08_binlog.md

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* use SetEventTimestamp() to replace SetStartTimestamp() and SetEndTimestamp()

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code optimize

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename AddStringToPayload/GetStringFromPayload to AddOneStringToPayload/GetOneStringFromPayload

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-07-07 19:10:07 +08:00 committed by GitHub
parent 3652b9da5b
commit 3387b07dfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 286 additions and 366 deletions

View File

@ -2,8 +2,9 @@
InsertBinlog、DeleteBinlog、DDLBinlog InsertBinlog、DeleteBinlog、DDLBinlog
Binlog is stored in a columnar storage format, every column in schema should be stored in a individual file. Timestamp, schema, row id and primary key allocated by system are four special columns. Schema column records the DDL of the collection. Binlog is stored in a columnar storage format, every column in schema is stored in an individual file.
Timestamp, schema, row id and primary key allocated by system are four special columns.
Schema column records the DDL of the collection.
## Event format ## Event format
@ -13,67 +14,63 @@ Binlog file consists of 4 bytes magic number and a series of events. The first e
### Event format ### Event format
``` ```
+=====================================+ +=====================================+=====================================================================+
| event | timestamp 0 : 8 | create timestamp | event | Timestamp 0 : 8 | create timestamp |
| header +----------------------------+ | header +----------------------------+---------------------------------------------------------------------+
| | type_code 8 : 1 | event type code | | TypeCode 8 : 1 | event type code |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | server_id 9 : 4 | write node id | | ServerID 9 : 4 | write node id |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | event_length 13 : 4 | length of event, including header and data | | EventLength 13 : 4 | length of event, including header and data |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | next_position 17 : 4 | offset of next event from the start of file | | NextPosition 17 : 4 | offset of next event from the start of file |
| +----------------------------+ +=====================================+=====================================================================+
| | extra_headers 21 : x-21 | reserved part | event | fixed part 21 : x | |
+=====================================+ | data +----------------------------+---------------------------------------------------------------------+
| event | fixed part x : y | | | variable part | |
| data +----------------------------+ +=====================================+=====================================================================+
| | variable part |
+=====================================+
``` ```
### Descriptor Event format ### Descriptor Event format
``` ```
+=====================================+ +=====================================+=====================================================================+
| event | timestamp 0 : 8 | create timestamp | event | Timestamp 0 : 8 | create timestamp |
| header +----------------------------+ | header +----------------------------+---------------------------------------------------------------------+
| | type_code 8 : 1 | event type code | | TypeCode 8 : 1 | event type code |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | server_id 9 : 4 | write node id | | ServerID 9 : 4 | write node id |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | event_length 13 : 4 | length of event, including header and data | | EventLength 13 : 4 | length of event, including header and data |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | next_position 17 : 4 | offset of next event from the start of file | | NextPosition 17 : 4 | offset of next event from the start of file |
+=====================================+ +=====================================+=====================================================================+
| event | binlog_version 21 : 2 | binlog version | event | BinlogVersion 21 : 2 | binlog version |
| data +----------------------------+ | data +----------------------------+---------------------------------------------------------------------+
| | server_version 23 : 8 | write node version | | ServerVersion 23 : 8 | write node version |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | commit_id 31 : 8 | commit id of the programe in git | | CommitID 31 : 8 | commit id of the programe in git |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | header_length 39 : 1 | header length of other event | | HeaderLength 39 : 1 | header length of other event |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | collection_id 40 : 8 | collection id | | CollectionID 40 : 8 | collection id |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | partition_id 48 : 8 | partition id (schema column does not need) | | PartitionID 48 : 8 | partition id (schema column does not need) |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | segment_id 56 : 8 | segment id (schema column does not need) | | SegmentID 56 : 8 | segment id (schema column does not need) |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | start_timestamp 64 : 1 | minimum timestamp allocated by master of all events in this file | | StartTimestamp 64 : 1 | minimum timestamp allocated by master of all events in this file |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | end_timestamp 65 : 1 | maximum timestamp allocated by master of all events in this file | | EndTimestamp 65 : 1 | maximum timestamp allocated by master of all events in this file |
| +----------------------------+ | +----------------------------+---------------------------------------------------------------------+
| | post-header 66 : n | array of n bytes, one byte per event type that the server knows about | | PayloadDataType 66 : 1 | data type of payload |
| | lengths for all | | +----------------------------+---------------------------------------------------------------------+
| | event types | | | PostHeaderLength 67 : n | header lengths for all event types |
+=====================================+ +=====================================+=====================================================================|
``` ```
### Type code ### Type code
``` ```
@ -95,35 +92,27 @@ DELETE_EVENT 只能用于primary key 的binlog文件目前只有按照primary
CREATE_COLLECTION_EVENT、DROP_COLLECTION_EVENT、CREATE_PARTITION_EVENT、DROP_PARTITION_EVENT 只出现在 DDL binlog 文件 CREATE_COLLECTION_EVENT、DROP_COLLECTION_EVENT、CREATE_PARTITION_EVENT、DROP_PARTITION_EVENT 只出现在 DDL binlog 文件
### Event data part ### Event data part
``` ```
event data part event data part
INSERT_EVENT: INSERT_EVENT:
+================================================+ +================================================+==========================================================+
| event | fixed | start_timestamp x : 8 | min timestamp in this event | event | fixed | StartTimestamp x : 8 | min timestamp in this event |
| data | part +------------------------------+ | data | part +------------------------------+----------------------------------------------------------+
| | | end_timestamp x+8 : 8 | max timestamp in this event | | | EndTimestamp x+8 : 8 | max timestamp in this event |
| | +------------------------------+ | | +------------------------------+----------------------------------------------------------+
| | | reserved x+16 : y-x-16 | reserved part | | | reserved x+16 : y | reserved part |
| +--------+------------------------------+ | +--------+------------------------------+----------------------------------------------------------+
| |variable| parquet payloI ad | payload in parquet format | |variable| parquet payload | payload in parquet format |
| |part | | | |part | | |
+================================================+ +================================================+==========================================================+
other events is similar with INSERT_EVENT
other events are similar with INSERT_EVENT
``` ```
### Example ### Example
Schema Schema
@ -212,12 +201,4 @@ CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values,
int GetPayloadLengthFromReader(CPayloadReader payloadReader); int GetPayloadLengthFromReader(CPayloadReader payloadReader);
CStatus ReleasePayloadReader(CPayloadReader payloadReader); CStatus ReleasePayloadReader(CPayloadReader payloadReader);
``` ```

View File

@ -35,8 +35,7 @@ func TestInsertBinlog(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
e2, err := w.NextInsertEventWriter() e2, err := w.NextInsertEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -46,11 +45,9 @@ func TestInsertBinlog(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12}) err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err) assert.Nil(t, err)
e2.SetStartTimestamp(300) e2.SetEventTimestamp(300, 400)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 2000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -294,8 +291,7 @@ func TestDeleteBinlog(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
e2, err := w.NextDeleteEventWriter() e2, err := w.NextDeleteEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -305,11 +301,9 @@ func TestDeleteBinlog(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12}) err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err) assert.Nil(t, err)
e2.SetStartTimestamp(300) e2.SetEventTimestamp(300, 400)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 2000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -553,8 +547,7 @@ func TestDDLBinlog1(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
e2, err := w.NextDropCollectionEventWriter() e2, err := w.NextDropCollectionEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -564,11 +557,9 @@ func TestDDLBinlog1(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12}) err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err) assert.Nil(t, err)
e2.SetStartTimestamp(300) e2.SetEventTimestamp(300, 400)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 2000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -812,8 +803,7 @@ func TestDDLBinlog2(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
e2, err := w.NextDropPartitionEventWriter() e2, err := w.NextDropPartitionEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -823,11 +813,9 @@ func TestDDLBinlog2(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12}) err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err) assert.Nil(t, err)
e2.SetStartTimestamp(300) e2.SetEventTimestamp(300, 400)
e2.SetEndTimestamp(400)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 2000)
w.SetEndTimeStamp(2000)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -1090,8 +1078,7 @@ func TestNewBinlogReaderError(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 2000)
w.SetEndTimeStamp(2000)
e1, err := w.NextInsertEventWriter() e1, err := w.NextInsertEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -1101,8 +1088,7 @@ func TestNewBinlogReaderError(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -1132,13 +1118,13 @@ func TestNewBinlogWriterTsError(t *testing.T) {
err = w.Close() err = w.Close()
assert.NotNil(t, err) assert.NotNil(t, err)
w.SetStartTimeStamp(1000) w.SetEventTimeStamp(1000, 0)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
err = w.Close() err = w.Close()
assert.NotNil(t, err) assert.NotNil(t, err)
w.SetEndTimeStamp(2000) w.SetEventTimeStamp(1000, 2000)
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
err = w.Close() err = w.Close()
@ -1146,7 +1132,6 @@ func TestNewBinlogWriterTsError(t *testing.T) {
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.Nil(t, err) assert.Nil(t, err)
} }
func TestInsertBinlogWriterCloseError(t *testing.T) { func TestInsertBinlogWriterCloseError(t *testing.T) {
@ -1155,17 +1140,14 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}) err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200) insertWriter.SetEventTimeStamp(1000, 2000)
insertWriter.SetStartTimeStamp(1000)
insertWriter.SetEndTimeStamp(2000)
err = insertWriter.Close() err = insertWriter.Close()
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, insertWriter.buffer) assert.NotNil(t, insertWriter.buffer)
insertEventWriter, err := insertWriter.NextInsertEventWriter() insertEventWriter, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, insertEventWriter) assert.Nil(t, insertEventWriter)
assert.NotNil(t, err) assert.NotNil(t, err)
} }
func TestDeleteBinlogWriteCloseError(t *testing.T) { func TestDeleteBinlogWriteCloseError(t *testing.T) {
@ -1174,10 +1156,8 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}) err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200) deleteWriter.SetEventTimeStamp(1000, 2000)
deleteWriter.SetStartTimeStamp(1000)
deleteWriter.SetEndTimeStamp(2000)
err = deleteWriter.Close() err = deleteWriter.Close()
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, deleteWriter.buffer) assert.NotNil(t, deleteWriter.buffer)
@ -1192,11 +1172,9 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}) err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(100) e1.SetEventTimestamp(100, 200)
e1.SetEndTimestamp(200)
ddBinlogWriter.SetStartTimeStamp(1000) ddBinlogWriter.SetEventTimeStamp(1000, 2000)
ddBinlogWriter.SetEndTimeStamp(2000)
err = ddBinlogWriter.Close() err = ddBinlogWriter.Close()
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, ddBinlogWriter.buffer) assert.NotNil(t, ddBinlogWriter.buffer)
@ -1216,7 +1194,6 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter() dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter()
assert.Nil(t, dropPartitionEventWriter) assert.Nil(t, dropPartitionEventWriter)
assert.NotNil(t, err) assert.NotNil(t, err)
} }
type testEvent struct { type testEvent struct {
@ -1276,8 +1253,7 @@ func TestWriterListError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
errorEvent := &testEvent{} errorEvent := &testEvent{}
insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent) insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent)
insertWriter.SetStartTimeStamp(1000) insertWriter.SetEventTimeStamp(1000, 2000)
insertWriter.SetEndTimeStamp(2000)
errorEvent.releasePayloadError = true errorEvent.releasePayloadError = true
err := insertWriter.Close() err := insertWriter.Close()
assert.NotNil(t, err) assert.NotNil(t, err)
@ -1297,5 +1273,4 @@ func TestWriterListError(t *testing.T) {
errorEvent.finishError = true errorEvent.finishError = true
err = insertWriter.Close() err = insertWriter.Close()
assert.NotNil(t, err) assert.NotNil(t, err)
} }

View File

@ -14,8 +14,7 @@ package storage
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"errors"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
) )
@ -79,7 +78,7 @@ func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
// GetBuffer get binlog buffer. Return nil if binlog is not finished yet. // GetBuffer get binlog buffer. Return nil if binlog is not finished yet.
func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) { func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) {
if writer.buffer == nil { if writer.buffer == nil {
return nil, errors.New("please close binlog before get buffer") return nil, fmt.Errorf("please close binlog before get buffer")
} }
return writer.buffer.Bytes(), nil return writer.buffer.Bytes(), nil
} }
@ -89,22 +88,21 @@ func (writer *baseBinlogWriter) Close() error {
if writer.buffer != nil { if writer.buffer != nil {
return nil return nil
} }
if writer.StartTimestamp == 0 { if writer.StartTimestamp == 0 || writer.EndTimestamp == 0 {
return errors.New("hasn't set start time stamp") return fmt.Errorf("invalid start/end timestamp")
}
if writer.EndTimestamp == 0 {
return errors.New("hasn't set end time stamp")
} }
var offset int32 var offset int32 = 0
writer.buffer = new(bytes.Buffer) writer.buffer = new(bytes.Buffer)
if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil { if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil {
return err return err
} }
offset += int32(binary.Size(MagicNumber))
if err := writer.descriptorEvent.Write(writer.buffer); err != nil { if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
return err return err
} }
offset = writer.descriptorEvent.GetMemoryUsageInBytes() + int32(binary.Size(MagicNumber)) offset += writer.descriptorEvent.GetMemoryUsageInBytes()
writer.length = 0 writer.length = 0
for _, w := range writer.eventWriters { for _, w := range writer.eventWriters {
w.SetOffset(offset) w.SetOffset(offset)
@ -137,7 +135,7 @@ type InsertBinlogWriter struct {
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) { func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newInsertEventWriter(writer.PayloadDataType) event, err := newInsertEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -153,7 +151,7 @@ type DeleteBinlogWriter struct {
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) { func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newDeleteEventWriter(writer.PayloadDataType) event, err := newDeleteEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -169,7 +167,7 @@ type DDLBinlogWriter struct {
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) { func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newCreateCollectionEventWriter(writer.PayloadDataType) event, err := newCreateCollectionEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -181,7 +179,7 @@ func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollect
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) { func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newDropCollectionEventWriter(writer.PayloadDataType) event, err := newDropCollectionEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -193,7 +191,7 @@ func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionE
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) { func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newCreatePartitionEventWriter(writer.PayloadDataType) event, err := newCreatePartitionEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -205,7 +203,7 @@ func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitio
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) { func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
if writer.isClosed() { if writer.isClosed() {
return nil, errors.New("binlog has closed") return nil, fmt.Errorf("binlog has closed")
} }
event, err := newDropPartitionEventWriter(writer.PayloadDataType) event, err := newDropPartitionEventWriter(writer.PayloadDataType)
if err != nil { if err != nil {
@ -232,6 +230,7 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
}, },
} }
} }
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter { func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter {
descriptorEvent := newDescriptorEvent() descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType descriptorEvent.PayloadDataType = dataType
@ -246,6 +245,7 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *Dele
}, },
} }
} }
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter { func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter {
descriptorEvent := newDescriptorEvent() descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType descriptorEvent.PayloadDataType = dataType

View File

@ -22,8 +22,7 @@ import (
func TestBinlogWriterReader(t *testing.T) { func TestBinlogWriterReader(t *testing.T) {
binlogWriter := NewInsertBinlogWriter(schemapb.DataType_Int32, 10, 20, 30, 40) binlogWriter := NewInsertBinlogWriter(schemapb.DataType_Int32, 10, 20, 30, 40)
binlogWriter.SetStartTimeStamp(1000) binlogWriter.SetEventTimeStamp(1000, 2000)
binlogWriter.SetEndTimeStamp(2000)
defer binlogWriter.Close() defer binlogWriter.Close()
eventWriter, err := binlogWriter.NextInsertEventWriter() eventWriter, err := binlogWriter.NextInsertEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -31,8 +30,7 @@ func TestBinlogWriterReader(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
_, err = binlogWriter.GetBuffer() _, err = binlogWriter.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)
eventWriter.SetStartTimestamp(1000) eventWriter.SetEventTimestamp(1000, 2000)
eventWriter.SetEndTimestamp(2000)
nums, err := binlogWriter.GetRowNums() nums, err := binlogWriter.GetRowNums()
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, 3, nums) assert.EqualValues(t, 3, nums)

View File

@ -20,7 +20,8 @@ static const char *ErrorMsg(const std::string &msg) {
return ret; return ret;
} }
extern "C" CPayloadWriter NewPayloadWriter(int columnType) { extern "C"
CPayloadWriter NewPayloadWriter(int columnType) {
auto p = new wrapper::PayloadWriter; auto p = new wrapper::PayloadWriter;
p->builder = nullptr; p->builder = nullptr;
p->schema = nullptr; p->schema = nullptr;
@ -125,30 +126,43 @@ CStatus AddValuesToPayload(CPayloadWriter payloadWriter, DT *values, int length)
return st; return st;
} }
extern "C" CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length) { extern "C"
CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length) {
return AddValuesToPayload<bool, arrow::BooleanBuilder>(payloadWriter, values, length); return AddValuesToPayload<bool, arrow::BooleanBuilder>(payloadWriter, values, length);
} }
extern "C" CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length) { extern "C"
CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length) {
return AddValuesToPayload<int8_t, arrow::Int8Builder>(payloadWriter, values, length); return AddValuesToPayload<int8_t, arrow::Int8Builder>(payloadWriter, values, length);
} }
extern "C" CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length) {
extern "C"
CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length) {
return AddValuesToPayload<int16_t, arrow::Int16Builder>(payloadWriter, values, length); return AddValuesToPayload<int16_t, arrow::Int16Builder>(payloadWriter, values, length);
} }
extern "C" CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length) {
extern "C"
CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length) {
return AddValuesToPayload<int32_t, arrow::Int32Builder>(payloadWriter, values, length); return AddValuesToPayload<int32_t, arrow::Int32Builder>(payloadWriter, values, length);
} }
extern "C" CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length) {
extern "C"
CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length) {
return AddValuesToPayload<int64_t, arrow::Int64Builder>(payloadWriter, values, length); return AddValuesToPayload<int64_t, arrow::Int64Builder>(payloadWriter, values, length);
} }
extern "C" CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length) {
extern "C"
CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length) {
return AddValuesToPayload<float, arrow::FloatBuilder>(payloadWriter, values, length); return AddValuesToPayload<float, arrow::FloatBuilder>(payloadWriter, values, length);
} }
extern "C" CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length) {
extern "C"
CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length) {
return AddValuesToPayload<double, arrow::DoubleBuilder>(payloadWriter, values, length); return AddValuesToPayload<double, arrow::DoubleBuilder>(payloadWriter, values, length);
} }
extern "C" CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size) { extern "C"
CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -180,7 +194,8 @@ extern "C" CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cst
return st; return st;
} }
extern "C" CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length) { extern "C"
CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -227,7 +242,8 @@ extern "C" CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_
return st; return st;
} }
extern "C" CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length) { extern "C"
CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -270,7 +286,8 @@ extern "C" CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *
return st; return st;
} }
extern "C" CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { extern "C"
CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -320,7 +337,8 @@ int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
return p->rows; return p->rows;
} }
extern "C" CStatus ReleasePayloadWriter(CPayloadWriter handler) { extern "C"
CStatus ReleasePayloadWriter(CPayloadWriter handler) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -329,7 +347,8 @@ extern "C" CStatus ReleasePayloadWriter(CPayloadWriter handler) {
return st; return st;
} }
extern "C" CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size) { extern "C"
CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size) {
auto p = new wrapper::PayloadReader; auto p = new wrapper::PayloadReader;
p->bValues = nullptr; p->bValues = nullptr;
p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size); p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size);
@ -369,7 +388,8 @@ extern "C" CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int6
return reinterpret_cast<CPayloadReader>(p); return reinterpret_cast<CPayloadReader>(p);
} }
extern "C" CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length) { extern "C"
CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -409,25 +429,38 @@ CStatus GetValuesFromPayload(CPayloadReader payloadReader, DT **values, int *len
return st; return st;
} }
extern "C" CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length) { extern "C"
CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length) {
return GetValuesFromPayload<int8_t, arrow::Int8Array>(payloadReader, values, length); return GetValuesFromPayload<int8_t, arrow::Int8Array>(payloadReader, values, length);
} }
extern "C" CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length) {
extern "C"
CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length) {
return GetValuesFromPayload<int16_t, arrow::Int16Array>(payloadReader, values, length); return GetValuesFromPayload<int16_t, arrow::Int16Array>(payloadReader, values, length);
} }
extern "C" CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length) {
extern "C"
CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length) {
return GetValuesFromPayload<int32_t, arrow::Int32Array>(payloadReader, values, length); return GetValuesFromPayload<int32_t, arrow::Int32Array>(payloadReader, values, length);
} }
extern "C" CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length) {
extern "C"
CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length) {
return GetValuesFromPayload<int64_t, arrow::Int64Array>(payloadReader, values, length); return GetValuesFromPayload<int64_t, arrow::Int64Array>(payloadReader, values, length);
} }
extern "C" CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length) {
extern "C"
CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length) {
return GetValuesFromPayload<float, arrow::FloatArray>(payloadReader, values, length); return GetValuesFromPayload<float, arrow::FloatArray>(payloadReader, values, length);
} }
extern "C" CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length) {
extern "C"
CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length) {
return GetValuesFromPayload<double, arrow::DoubleArray>(payloadReader, values, length); return GetValuesFromPayload<double, arrow::DoubleArray>(payloadReader, values, length);
} }
extern "C" CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size) {
extern "C"
CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -448,10 +481,9 @@ extern "C" CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx
*str_size = length; *str_size = length;
return st; return st;
} }
extern "C" CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader,
uint8_t **values, extern "C"
int *dimension, CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t **values, int *dimension, int *length) {
int *length) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -467,10 +499,9 @@ extern "C" CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader,
*values = (uint8_t *) array->raw_values(); *values = (uint8_t *) array->raw_values();
return st; return st;
} }
extern "C" CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader,
float **values, extern "C"
int *dimension, CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values, int *dimension, int *length) {
int *length) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;
@ -487,13 +518,15 @@ extern "C" CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader,
return st; return st;
} }
extern "C" int GetPayloadLengthFromReader(CPayloadReader payloadReader) { extern "C"
int GetPayloadLengthFromReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader); auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
if (p->array == nullptr) return 0; if (p->array == nullptr) return 0;
return p->array->length(); return p->array->length();
} }
extern "C" CStatus ReleasePayloadReader(CPayloadReader payloadReader) { extern "C"
CStatus ReleasePayloadReader(CPayloadReader payloadReader) {
CStatus st; CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS); st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr; st.error_msg = nullptr;

View File

@ -18,8 +18,6 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
typedef void *CPayloadWriter;
typedef struct CBuffer { typedef struct CBuffer {
char *data; char *data;
int length; int length;
@ -30,6 +28,8 @@ typedef struct CStatus {
const char *error_msg; const char *error_msg;
} CStatus; } CStatus;
//============= payload writer ======================
typedef void *CPayloadWriter;
CPayloadWriter NewPayloadWriter(int columnType); CPayloadWriter NewPayloadWriter(int columnType);
CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length); CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length);
CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length); CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length);
@ -48,7 +48,6 @@ int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter);
CStatus ReleasePayloadWriter(CPayloadWriter handler); CStatus ReleasePayloadWriter(CPayloadWriter handler);
//============= payload reader ====================== //============= payload reader ======================
typedef void *CPayloadReader; typedef void *CPayloadReader;
CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size); CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size);
CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length); CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length);

View File

@ -13,7 +13,6 @@ package storage
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sort" "sort"
"strconv" "strconv"
@ -150,7 +149,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
var writer *InsertBinlogWriter var writer *InsertBinlogWriter
timeFieldData, ok := data.Data[rootcoord.TimeStampField] timeFieldData, ok := data.Data[rootcoord.TimeStampField]
if !ok { if !ok {
return nil, nil, errors.New("data doesn't contains timestamp field") return nil, nil, fmt.Errorf("data doesn't contains timestamp field")
} }
ts := timeFieldData.(*Int64FieldData).Data ts := timeFieldData.(*Int64FieldData).Data
startTs := ts[0] startTs := ts[0]
@ -172,8 +171,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return nil, nil, err return nil, nil, err
} }
eventWriter.SetStartTimestamp(typeutil.Timestamp(startTs)) eventWriter.SetEventTimestamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs))
eventWriter.SetEndTimestamp(typeutil.Timestamp(endTs))
switch field.DataType { switch field.DataType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
@ -206,8 +204,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
writer.SetStartTimeStamp(typeutil.Timestamp(startTs)) writer.SetEventTimeStamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs))
writer.SetEndTimeStamp(typeutil.Timestamp(endTs))
err = writer.Close() err = writer.Close()
if err != nil { if err != nil {
@ -242,9 +239,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return blobs, statsBlobs, nil return blobs, statsBlobs, nil
} }
func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) { func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
if len(blobs) == 0 { if len(blobs) == 0 {
return -1, -1, nil, errors.New("blobs is empty") return -1, -1, nil, fmt.Errorf("blobs is empty")
} }
readerClose := func(reader *BinlogReader) func() error { readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() } return func() error { return reader.Close() }
@ -507,10 +505,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventWriter.SetStartTimestamp(ts[0]) eventWriter.SetEventTimestamp(ts[0], ts[len(ts)-1])
eventWriter.SetEndTimestamp(ts[len(ts)-1]) writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close() err = writer.Close()
if err != nil { if err != nil {
return nil, err return nil, err
@ -537,45 +533,40 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventWriter.SetStartTimestamp(ts[pos]) eventWriter.SetEventTimestamp(ts[pos], ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
case DropCollectionEventType: case DropCollectionEventType:
eventWriter, err := writer.NextDropCollectionEventWriter() eventWriter, err := writer.NextDropCollectionEventWriter()
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = eventWriter.AddOneStringToPayload(req) err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
case CreatePartitionEventType: case CreatePartitionEventType:
eventWriter, err := writer.NextCreatePartitionEventWriter() eventWriter, err := writer.NextCreatePartitionEventWriter()
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = eventWriter.AddOneStringToPayload(req) err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
case DropPartitionEventType: case DropPartitionEventType:
eventWriter, err := writer.NextDropPartitionEventWriter() eventWriter, err := writer.NextDropPartitionEventWriter()
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = eventWriter.AddOneStringToPayload(req) err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil { if err != nil {
return nil, err return nil, err
} }
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
} }
} }
writer.SetStartTimeStamp(ts[0]) writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close() err = writer.Close()
if err != nil { if err != nil {
return nil, err return nil, err
@ -590,12 +581,11 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
}) })
return blobs, nil return blobs, nil
} }
func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) { func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
if len(blobs) == 0 { if len(blobs) == 0 {
return nil, nil, errors.New("blobs is empty") return nil, nil, fmt.Errorf("blobs is empty")
} }
readerClose := func(reader *BinlogReader) func() error { readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() } return func() error { return reader.Close() }
@ -707,7 +697,7 @@ func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]st
break break
} }
if file == nil { if file == nil {
return nil, nil, "", -1, errors.New("can not find params blob") return nil, nil, "", -1, fmt.Errorf("can not find params blob")
} }
info := struct { info := struct {
Params map[string]string Params map[string]string

View File

@ -13,6 +13,7 @@ package storage
import ( import (
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/rootcoord"
) )
type DataSorter struct { type DataSorter struct {
@ -20,17 +21,15 @@ type DataSorter struct {
InsertData *InsertData InsertData *InsertData
} }
func (ds *DataSorter) getIDField() FieldData { func (ds *DataSorter) getRowIDFieldData() FieldData {
for _, field := range ds.InsertCodec.Schema.Schema.Fields { if data, ok := ds.InsertData.Data[rootcoord.RowIDField]; ok {
if field.FieldID == 0 { return data
return ds.InsertData.Data[field.FieldID]
}
} }
return nil return nil
} }
func (ds *DataSorter) Len() int { func (ds *DataSorter) Len() int {
return len(ds.getIDField().(*Int64FieldData).Data) return len(ds.getRowIDFieldData().(*Int64FieldData).Data)
} }
func (ds *DataSorter) Swap(i, j int) { func (ds *DataSorter) Swap(i, j int) {
@ -81,5 +80,6 @@ func (ds *DataSorter) Swap(i, j int) {
} }
func (ds *DataSorter) Less(i, j int) bool { func (ds *DataSorter) Less(i, j int) bool {
return ds.getIDField().(*Int64FieldData).Data[i] < ds.getIDField().(*Int64FieldData).Data[j] ids := ds.getRowIDFieldData().(*Int64FieldData).Data
return ids[i] < ids[j]
} }

View File

@ -13,9 +13,8 @@ package storage
import ( import (
"encoding/binary" "encoding/binary"
"io"
"errors" "errors"
"io"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
@ -40,23 +39,23 @@ type DescriptorEventDataFixPart struct {
PayloadDataType schemapb.DataType PayloadDataType schemapb.DataType
} }
func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) { func (data *descriptorEventData) SetEventTimeStamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = ts data.StartTimestamp = start
data.EndTimestamp = end
} }
func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) { func (data *descriptorEventData) GetEventDataFixPartSize() int32 {
data.EndTimestamp = ts return int32(binary.Size(data.DescriptorEventDataFixPart))
} }
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 { func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
return int32(binary.Size(data.DescriptorEventDataFixPart) + binary.Size(data.PostHeaderLengths)) return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths))
} }
func (data *descriptorEventData) Write(buffer io.Writer) error { func (data *descriptorEventData) Write(buffer io.Writer) error {
if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil { if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil {
return err return err
} }
if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil { if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil {
return err return err
} }
@ -65,15 +64,12 @@ func (data *descriptorEventData) Write(buffer io.Writer) error {
func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
event := newDescriptorEventData() event := newDescriptorEventData()
if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil { if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil {
return nil, err return nil, err
} }
if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil { if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil {
return nil, err return nil, err
} }
return event, nil return event, nil
} }
@ -89,12 +85,9 @@ type insertEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *insertEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *insertEventData) GetEventDataFixPartSize() int32 { func (data *insertEventData) GetEventDataFixPartSize() int32 {
@ -116,12 +109,9 @@ type deleteEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *deleteEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *deleteEventData) GetEventDataFixPartSize() int32 { func (data *deleteEventData) GetEventDataFixPartSize() int32 {
@ -143,12 +133,9 @@ type createCollectionEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *createCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *createCollectionEventData) GetEventDataFixPartSize() int32 { func (data *createCollectionEventData) GetEventDataFixPartSize() int32 {
@ -170,12 +157,9 @@ type dropCollectionEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *dropCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 { func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 {
@ -197,12 +181,9 @@ type createPartitionEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *createPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *createPartitionEventData) GetEventDataFixPartSize() int32 { func (data *createPartitionEventData) GetEventDataFixPartSize() int32 {
@ -224,12 +205,9 @@ type dropPartitionEventData struct {
EndTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp
} }
func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { func (data *dropPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
data.StartTimestamp = timestamp data.StartTimestamp = start
} data.EndTimestamp = end
func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
data.EndTimestamp = timestamp
} }
func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 { func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 {
@ -249,7 +227,7 @@ func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
func getEventFixPartSize(code EventTypeCode) int32 { func getEventFixPartSize(code EventTypeCode) int32 {
switch code { switch code {
case DescriptorEventType: case DescriptorEventType:
return int32(binary.Size(descriptorEventData{}.DescriptorEventDataFixPart)) return (&descriptorEventData{}).GetEventDataFixPartSize()
case InsertEventType: case InsertEventType:
return (&insertEventData{}).GetEventDataFixPartSize() return (&insertEventData{}).GetEventDataFixPartSize()
case DeleteEventType: case DeleteEventType:

View File

@ -13,7 +13,6 @@ package storage
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
@ -27,28 +26,21 @@ type EventReader struct {
isClosed bool isClosed bool
} }
func (reader *EventReader) checkClose() error { func (reader *EventReader) readHeader() error {
if reader.isClosed { if reader.isClosed {
return errors.New("event reader is closed") return fmt.Errorf("event reader is closed")
}
return nil
}
func (reader *EventReader) readHeader() (*eventHeader, error) {
if err := reader.checkClose(); err != nil {
return nil, err
} }
header, err := readEventHeader(reader.buffer) header, err := readEventHeader(reader.buffer)
if err != nil { if err != nil {
return nil, err return err
} }
reader.eventHeader = *header reader.eventHeader = *header
return &reader.eventHeader, nil return nil
} }
func (reader *EventReader) readData() (eventData, error) { func (reader *EventReader) readData() error {
if err := reader.checkClose(); err != nil { if reader.isClosed {
return nil, err return fmt.Errorf("event reader is closed")
} }
var data eventData var data eventData
var err error var err error
@ -66,15 +58,14 @@ func (reader *EventReader) readData() (eventData, error) {
case DropPartitionEventType: case DropPartitionEventType:
data, err = readDropPartitionEventDataFixPart(reader.buffer) data, err = readDropPartitionEventDataFixPart(reader.buffer)
default: default:
return nil, fmt.Errorf("unknown header type code: %d", reader.TypeCode) return fmt.Errorf("unknown header type code: %d", reader.TypeCode)
} }
if err != nil { if err != nil {
return nil, err return err
} }
reader.eventData = data reader.eventData = data
return reader.eventData, nil return nil
} }
func (reader *EventReader) Close() error { func (reader *EventReader) Close() error {
@ -84,6 +75,7 @@ func (reader *EventReader) Close() error {
} }
return nil return nil
} }
func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) { func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) {
reader := &EventReader{ reader := &EventReader{
eventHeader: eventHeader{ eventHeader: eventHeader{
@ -93,15 +85,15 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea
isClosed: false, isClosed: false,
} }
if _, err := reader.readHeader(); err != nil { if err := reader.readHeader(); err != nil {
return nil, err
}
if err := reader.readData(); err != nil {
return nil, err return nil, err
} }
if _, err := reader.readData(); err != nil { next := int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize())
return nil, err payloadBuffer := buffer.Next(next)
}
payloadBuffer := buffer.Next(int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize()))
payloadReader, err := NewPayloadReader(datatype, payloadBuffer) payloadReader, err := NewPayloadReader(datatype, payloadBuffer)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -177,8 +177,7 @@ func TestInsertEvent(t *testing.T) {
) { ) {
w, err := newInsertEventWriter(dt) w, err := newInsertEventWriter(dt)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = ir1(w) err = ir1(w)
assert.Nil(t, err) assert.Nil(t, err)
err = iw(w) err = iw(w)
@ -350,8 +349,7 @@ func TestInsertEvent(t *testing.T) {
t.Run("insert_string", func(t *testing.T) { t.Run("insert_string", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String) w, err := newInsertEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -426,8 +424,7 @@ func TestDeleteEvent(t *testing.T) {
) { ) {
w, err := newDeleteEventWriter(dt) w, err := newDeleteEventWriter(dt)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = ir1(w) err = ir1(w)
assert.Nil(t, err) assert.Nil(t, err)
err = iw(w) err = iw(w)
@ -599,8 +596,7 @@ func TestDeleteEvent(t *testing.T) {
t.Run("delete_string", func(t *testing.T) { t.Run("delete_string", func(t *testing.T) {
w, err := newDeleteEventWriter(schemapb.DataType_String) w, err := newDeleteEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -675,8 +671,7 @@ func TestCreateCollectionEvent(t *testing.T) {
t.Run("create_collection_timestamp", func(t *testing.T) { t.Run("create_collection_timestamp", func(t *testing.T) {
w, err := newCreateCollectionEventWriter(schemapb.DataType_Int64) w, err := newCreateCollectionEventWriter(schemapb.DataType_Int64)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload([]int64{1, 2, 3}) err = w.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddDataToPayload([]int{4, 5, 6}) err = w.AddDataToPayload([]int{4, 5, 6})
@ -722,8 +717,7 @@ func TestCreateCollectionEvent(t *testing.T) {
t.Run("create_collection_string", func(t *testing.T) { t.Run("create_collection_string", func(t *testing.T) {
w, err := newCreateCollectionEventWriter(schemapb.DataType_String) w, err := newCreateCollectionEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -798,8 +792,7 @@ func TestDropCollectionEvent(t *testing.T) {
t.Run("drop_collection_timestamp", func(t *testing.T) { t.Run("drop_collection_timestamp", func(t *testing.T) {
w, err := newDropCollectionEventWriter(schemapb.DataType_Int64) w, err := newDropCollectionEventWriter(schemapb.DataType_Int64)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload([]int64{1, 2, 3}) err = w.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddDataToPayload([]int{4, 5, 6}) err = w.AddDataToPayload([]int{4, 5, 6})
@ -845,8 +838,7 @@ func TestDropCollectionEvent(t *testing.T) {
t.Run("drop_collection_string", func(t *testing.T) { t.Run("drop_collection_string", func(t *testing.T) {
w, err := newDropCollectionEventWriter(schemapb.DataType_String) w, err := newDropCollectionEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -921,8 +913,7 @@ func TestCreatePartitionEvent(t *testing.T) {
t.Run("create_partition_timestamp", func(t *testing.T) { t.Run("create_partition_timestamp", func(t *testing.T) {
w, err := newCreatePartitionEventWriter(schemapb.DataType_Int64) w, err := newCreatePartitionEventWriter(schemapb.DataType_Int64)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload([]int64{1, 2, 3}) err = w.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddDataToPayload([]int{4, 5, 6}) err = w.AddDataToPayload([]int{4, 5, 6})
@ -968,8 +959,7 @@ func TestCreatePartitionEvent(t *testing.T) {
t.Run("create_partition_string", func(t *testing.T) { t.Run("create_partition_string", func(t *testing.T) {
w, err := newCreatePartitionEventWriter(schemapb.DataType_String) w, err := newCreatePartitionEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -1044,8 +1034,7 @@ func TestDropPartitionEvent(t *testing.T) {
t.Run("drop_partition_timestamp", func(t *testing.T) { t.Run("drop_partition_timestamp", func(t *testing.T) {
w, err := newDropPartitionEventWriter(schemapb.DataType_Int64) w, err := newDropPartitionEventWriter(schemapb.DataType_Int64)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload([]int64{1, 2, 3}) err = w.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddDataToPayload([]int{4, 5, 6}) err = w.AddDataToPayload([]int{4, 5, 6})
@ -1091,8 +1080,7 @@ func TestDropPartitionEvent(t *testing.T) {
t.Run("drop_partition_string", func(t *testing.T) { t.Run("drop_partition_string", func(t *testing.T) {
w, err := newDropPartitionEventWriter(schemapb.DataType_String) w, err := newDropPartitionEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.AddOneStringToPayload("567890") err = w.AddOneStringToPayload("567890")
@ -1302,8 +1290,7 @@ func TestEventReaderError(t *testing.T) {
func TestEventClose(t *testing.T) { func TestEventClose(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String) w, err := newInsertEventWriter(schemapb.DataType_String)
assert.Nil(t, err) assert.Nil(t, err)
w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
w.SetEndTimestamp(tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234") err = w.AddDataToPayload("1234")
assert.Nil(t, err) assert.Nil(t, err)
err = w.Finish() err = w.Finish()
@ -1324,8 +1311,8 @@ func TestEventClose(t *testing.T) {
err = r.Close() err = r.Close()
assert.Nil(t, err) assert.Nil(t, err)
_, err = r.readHeader() err = r.readHeader()
assert.NotNil(t, err) assert.NotNil(t, err)
_, err = r.readData() err = r.readData()
assert.NotNil(t, err) assert.NotNil(t, err)
} }

View File

@ -14,9 +14,8 @@ package storage
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"io"
"errors" "errors"
"io"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
) )
@ -35,12 +34,19 @@ const (
) )
func (code EventTypeCode) String() string { func (code EventTypeCode) String() string {
codes := []string{"DescriptorEventType", "InsertEventType", "DeleteEventType", "CreateCollectionEventType", "DropCollectionEventType", codes := map[EventTypeCode]string{
"CreatePartitionEventType", "DropPartitionEventType"} DescriptorEventType: "DescriptorEventType",
if len(codes) < int(code) { InsertEventType: "InsertEventType",
return "" DeleteEventType: "DeleteEventType",
CreateCollectionEventType: "CreateCollectionEventType",
DropCollectionEventType: "DropCollectionEventType",
CreatePartitionEventType: "CreatePartitionEventType",
DropPartitionEventType: "DropPartitionEventType",
} }
return codes[code] if eventTypeStr, ok := codes[code]; ok {
return eventTypeStr
}
return "InvalidEventType"
} }
type descriptorEvent struct { type descriptorEvent struct {
@ -104,8 +110,8 @@ func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) {
if err != nil { if err != nil {
return -1, err return -1, err
} }
return writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + size := writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + int32(len(data))
int32(len(data)), nil return size, nil
} }
func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error { func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error {
@ -115,7 +121,6 @@ func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error {
if err := writer.writeEventData(buffer); err != nil { if err := writer.writeEventData(buffer); err != nil {
return err return err
} }
data, err := writer.GetPayloadBufferFromWriter() data, err := writer.GetPayloadBufferFromWriter()
if err != nil { if err != nil {
return err return err
@ -138,7 +143,6 @@ func (writer *baseEventWriter) Finish() error {
} }
writer.EventLength = eventLength writer.EventLength = eventLength
writer.NextPosition = eventLength + writer.offset writer.NextPosition = eventLength + writer.offset
} }
return nil return nil
} }
@ -229,6 +233,7 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error
} }
header := newEventHeader(DeleteEventType) header := newEventHeader(DeleteEventType)
data := newDeleteEventData() data := newDeleteEventData()
writer := &deleteEventWriter{ writer := &deleteEventWriter{
baseEventWriter: baseEventWriter{ baseEventWriter: baseEventWriter{
eventHeader: *header, eventHeader: *header,
@ -242,6 +247,7 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
return writer, nil return writer, nil
} }
func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollectionEventWriter, error) { func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollectionEventWriter, error) {
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
return nil, errors.New("incorrect data type") return nil, errors.New("incorrect data type")
@ -267,6 +273,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
return writer, nil return writer, nil
} }
func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEventWriter, error) { func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEventWriter, error) {
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
return nil, errors.New("incorrect data type") return nil, errors.New("incorrect data type")
@ -278,6 +285,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
} }
header := newEventHeader(DropCollectionEventType) header := newEventHeader(DropCollectionEventType)
data := newDropCollectionEventData() data := newDropCollectionEventData()
writer := &dropCollectionEventWriter{ writer := &dropCollectionEventWriter{
baseEventWriter: baseEventWriter{ baseEventWriter: baseEventWriter{
eventHeader: *header, eventHeader: *header,
@ -291,6 +299,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
return writer, nil return writer, nil
} }
func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartitionEventWriter, error) { func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartitionEventWriter, error) {
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
return nil, errors.New("incorrect data type") return nil, errors.New("incorrect data type")
@ -316,6 +325,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
return writer, nil return writer, nil
} }
func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEventWriter, error) { func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEventWriter, error) {
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
return nil, errors.New("incorrect data type") return nil, errors.New("incorrect data type")
@ -327,6 +337,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
} }
header := newEventHeader(DropPartitionEventType) header := newEventHeader(DropPartitionEventType)
data := newDropPartitionEventData() data := newDropPartitionEventData()
writer := &dropPartitionEventWriter{ writer := &dropPartitionEventWriter{
baseEventWriter: baseEventWriter{ baseEventWriter: baseEventWriter{
eventHeader: *header, eventHeader: *header,

View File

@ -66,8 +66,7 @@ func TestEventWriter(t *testing.T) {
err = insertEvent.AddInt32ToPayload([]int32{1}) err = insertEvent.AddInt32ToPayload([]int32{1})
assert.NotNil(t, err) assert.NotNil(t, err)
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
insertEvent.SetStartTimestamp(100) insertEvent.SetEventTimestamp(100, 200)
insertEvent.SetEndTimestamp(200)
err = insertEvent.Write(buffer) err = insertEvent.Write(buffer)
assert.Nil(t, err) assert.Nil(t, err)
length, err = insertEvent.GetMemoryUsageInBytes() length, err = insertEvent.GetMemoryUsageInBytes()

View File

@ -20,9 +20,8 @@ package storage
*/ */
import "C" import "C"
import ( import (
"unsafe"
"errors" "errors"
"unsafe"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/proto/schemapb"
@ -63,6 +62,7 @@ type PayloadReaderInterface interface {
ReleasePayloadReader() error ReleasePayloadReader() error
Close() error Close() error
} }
type PayloadWriter struct { type PayloadWriter struct {
payloadWriterPtr C.CPayloadWriter payloadWriterPtr C.CPayloadWriter
colType schemapb.DataType colType schemapb.DataType
@ -91,49 +91,42 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddBoolToPayload(val) return w.AddBoolToPayload(val)
case schemapb.DataType_Int8: case schemapb.DataType_Int8:
val, ok := msgs.([]int8) val, ok := msgs.([]int8)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddInt8ToPayload(val) return w.AddInt8ToPayload(val)
case schemapb.DataType_Int16: case schemapb.DataType_Int16:
val, ok := msgs.([]int16) val, ok := msgs.([]int16)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddInt16ToPayload(val) return w.AddInt16ToPayload(val)
case schemapb.DataType_Int32: case schemapb.DataType_Int32:
val, ok := msgs.([]int32) val, ok := msgs.([]int32)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddInt32ToPayload(val) return w.AddInt32ToPayload(val)
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
val, ok := msgs.([]int64) val, ok := msgs.([]int64)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddInt64ToPayload(val) return w.AddInt64ToPayload(val)
case schemapb.DataType_Float: case schemapb.DataType_Float:
val, ok := msgs.([]float32) val, ok := msgs.([]float32)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddFloatToPayload(val) return w.AddFloatToPayload(val)
case schemapb.DataType_Double: case schemapb.DataType_Double:
val, ok := msgs.([]float64) val, ok := msgs.([]float64)
if !ok { if !ok {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddDoubleToPayload(val) return w.AddDoubleToPayload(val)
case schemapb.DataType_String: case schemapb.DataType_String:
val, ok := msgs.(string) val, ok := msgs.(string)
if !ok { if !ok {
@ -151,7 +144,6 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
return errors.New("incorrect data type") return errors.New("incorrect data type")
} }
return w.AddBinaryVectorToPayload(val, dim[0]) return w.AddBinaryVectorToPayload(val, dim[0])
case schemapb.DataType_FloatVector: case schemapb.DataType_FloatVector:
val, ok := msgs.([]float32) val, ok := msgs.([]float32)
if !ok { if !ok {
@ -161,10 +153,8 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error {
default: default:
return errors.New("incorrect datatype") return errors.New("incorrect datatype")
} }
default: default:
return errors.New("incorrect input numbers") return errors.New("incorrect input numbers")
} }
} }
@ -334,7 +324,6 @@ func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error {
if length <= 0 { if length <= 0 {
return errors.New("can't add empty binVec into payload") return errors.New("can't add empty binVec into payload")
} }
if dim <= 0 { if dim <= 0 {
return errors.New("dimension should be greater than 0") return errors.New("dimension should be greater than 0")
} }
@ -359,16 +348,15 @@ func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) err
if length <= 0 { if length <= 0 {
return errors.New("can't add empty floatVec into payload") return errors.New("can't add empty floatVec into payload")
} }
if dim <= 0 { if dim <= 0 {
return errors.New("dimension should be greater than 0") return errors.New("dimension should be greater than 0")
} }
cBinVec := (*C.float)(&floatVec[0]) cVec := (*C.float)(&floatVec[0])
cDim := C.int(dim) cDim := C.int(dim)
cLength := C.int(length / dim) cLength := C.int(length / dim)
st := C.AddFloatVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength) st := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength)
errCode := commonpb.ErrorCode(st.error_code) errCode := commonpb.ErrorCode(st.error_code)
if errCode != commonpb.ErrorCode_Success { if errCode != commonpb.ErrorCode_Success {
msg := C.GoString(st.error_msg) msg := C.GoString(st.error_msg)
@ -446,45 +434,37 @@ func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error)
val, err := r.GetOneStringFromPayload(idx[0]) val, err := r.GetOneStringFromPayload(idx[0])
return val, 0, err return val, 0, err
default: default:
return nil, 0, errors.New("Unknown type") return nil, 0, errors.New("unknown type")
} }
case 0: case 0:
switch r.colType { switch r.colType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
val, err := r.GetBoolFromPayload() val, err := r.GetBoolFromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Int8: case schemapb.DataType_Int8:
val, err := r.GetInt8FromPayload() val, err := r.GetInt8FromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Int16: case schemapb.DataType_Int16:
val, err := r.GetInt16FromPayload() val, err := r.GetInt16FromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Int32: case schemapb.DataType_Int32:
val, err := r.GetInt32FromPayload() val, err := r.GetInt32FromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
val, err := r.GetInt64FromPayload() val, err := r.GetInt64FromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Float: case schemapb.DataType_Float:
val, err := r.GetFloatFromPayload() val, err := r.GetFloatFromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_Double: case schemapb.DataType_Double:
val, err := r.GetDoubleFromPayload() val, err := r.GetDoubleFromPayload()
return val, 0, err return val, 0, err
case schemapb.DataType_BinaryVector: case schemapb.DataType_BinaryVector:
return r.GetBinaryVectorFromPayload() return r.GetBinaryVectorFromPayload()
case schemapb.DataType_FloatVector: case schemapb.DataType_FloatVector:
return r.GetFloatVectorFromPayload() return r.GetFloatVectorFromPayload()
default: default:
return nil, 0, errors.New("Unknown type") return nil, 0, errors.New("unknown type")
} }
default: default:
return nil, 0, errors.New("incorrect number of index") return nil, 0, errors.New("incorrect number of index")

View File

@ -570,7 +570,7 @@ func TestPayload_ReaderandWriter(t *testing.T) {
err = w.AddDoubleToPayload([]float64{0.0}) err = w.AddDoubleToPayload([]float64{0.0})
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("TestAddStringAfterFinish", func(t *testing.T) { t.Run("TestAddOneStringAfterFinish", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_String) w, err := NewPayloadWriter(schemapb.DataType_String)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, w) require.NotNil(t, w)
@ -819,7 +819,7 @@ func TestPayload_ReaderandWriter(t *testing.T) {
_, err = r.GetDoubleFromPayload() _, err = r.GetDoubleFromPayload()
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("TestGetStringError", func(t *testing.T) { t.Run("TestGetOneStringError", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Bool) w, err := NewPayloadWriter(schemapb.DataType_Bool)
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, w) require.NotNil(t, w)

View File

@ -39,8 +39,7 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e1.AddDataToPayload([]int64{4, 5, 6}) err = e1.AddDataToPayload([]int64{4, 5, 6})
assert.Nil(t, err) assert.Nil(t, err)
e1.SetStartTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0)) e1.SetEventTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0), tsoutil.ComposeTS(curTS+20*60*1000, 0))
e1.SetEndTimestamp(tsoutil.ComposeTS(curTS+20*60*1000, 0))
e2, err := w.NextInsertEventWriter() e2, err := w.NextInsertEventWriter()
assert.Nil(t, err) assert.Nil(t, err)
@ -50,11 +49,9 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
err = e2.AddDataToPayload([]int64{10, 11, 12}) err = e2.AddDataToPayload([]int64{10, 11, 12})
assert.Nil(t, err) assert.Nil(t, err)
e2.SetStartTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0)) e2.SetEventTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0), tsoutil.ComposeTS(curTS+40*60*1000, 0))
e2.SetEndTimestamp(tsoutil.ComposeTS(curTS+40*60*1000, 0))
w.SetStartTimeStamp(tsoutil.ComposeTS(curTS, 0)) w.SetEventTimeStamp(tsoutil.ComposeTS(curTS, 0), tsoutil.ComposeTS(curTS+3600*1000, 0))
w.SetEndTimeStamp(tsoutil.ComposeTS(curTS+3600*1000, 0))
_, err = w.GetBuffer() _, err = w.GetBuffer()
assert.NotNil(t, err) assert.NotNil(t, err)