diff --git a/docs/developer_guides/chap08_binlog.md b/docs/developer_guides/chap08_binlog.md index 7d7fb093d3..e3c7315f37 100644 --- a/docs/developer_guides/chap08_binlog.md +++ b/docs/developer_guides/chap08_binlog.md @@ -2,8 +2,9 @@ InsertBinlog、DeleteBinlog、DDLBinlog -Binlog is stored in a columnar storage format, every column in schema should be stored in a individual file. Timestamp, schema, row id and primary key allocated by system are four special columns. Schema column records the DDL of the collection. - +Binlog is stored in a columnar storage format, every column in schema is stored in an individual file. +Timestamp, schema, row id and primary key allocated by system are four special columns. +Schema column records the DDL of the collection. ## Event format @@ -13,67 +14,63 @@ Binlog file consists of 4 bytes magic number and a series of events. The first e ### Event format ``` -+=====================================+ -| event | timestamp 0 : 8 | create timestamp -| header +----------------------------+ -| | type_code 8 : 1 | event type code -| +----------------------------+ -| | server_id 9 : 4 | write node id -| +----------------------------+ -| | event_length 13 : 4 | length of event, including header and data -| +----------------------------+ -| | next_position 17 : 4 | offset of next event from the start of file -| +----------------------------+ -| | extra_headers 21 : x-21 | reserved part -+=====================================+ -| event | fixed part x : y | -| data +----------------------------+ -| | variable part | -+=====================================+ ++=====================================+=====================================================================+ +| event | Timestamp 0 : 8 | create timestamp | +| header +----------------------------+---------------------------------------------------------------------+ +| | TypeCode 8 : 1 | event type code | +| +----------------------------+---------------------------------------------------------------------+ +| | ServerID 9 : 4 | write node id | +| +----------------------------+---------------------------------------------------------------------+ +| | EventLength 13 : 4 | length of event, including header and data | +| +----------------------------+---------------------------------------------------------------------+ +| | NextPosition 17 : 4 | offset of next event from the start of file | ++=====================================+=====================================================================+ +| event | fixed part 21 : x | | +| data +----------------------------+---------------------------------------------------------------------+ +| | variable part | | ++=====================================+=====================================================================+ ``` - ### Descriptor Event format ``` -+=====================================+ -| event | timestamp 0 : 8 | create timestamp -| header +----------------------------+ -| | type_code 8 : 1 | event type code -| +----------------------------+ -| | server_id 9 : 4 | write node id -| +----------------------------+ -| | event_length 13 : 4 | length of event, including header and data -| +----------------------------+ -| | next_position 17 : 4 | offset of next event from the start of file -+=====================================+ -| event | binlog_version 21 : 2 | binlog version -| data +----------------------------+ -| | server_version 23 : 8 | write node version -| +----------------------------+ -| | commit_id 31 : 8 | commit id of the programe in git -| +----------------------------+ -| | header_length 39 : 1 | header length of other event -| +----------------------------+ -| | collection_id 40 : 8 | collection id -| +----------------------------+ -| | partition_id 48 : 8 | partition id (schema column does not need) -| +----------------------------+ -| | segment_id 56 : 8 | segment id (schema column does not need) -| +----------------------------+ -| | start_timestamp 64 : 1 | minimum timestamp allocated by master of all events in this file -| +----------------------------+ -| | end_timestamp 65 : 1 | maximum timestamp allocated by master of all events in this file -| +----------------------------+ -| | post-header 66 : n | array of n bytes, one byte per event type that the server knows about -| | lengths for all | -| | event types | -+=====================================+ ++=====================================+=====================================================================+ +| event | Timestamp 0 : 8 | create timestamp | +| header +----------------------------+---------------------------------------------------------------------+ +| | TypeCode 8 : 1 | event type code | +| +----------------------------+---------------------------------------------------------------------+ +| | ServerID 9 : 4 | write node id | +| +----------------------------+---------------------------------------------------------------------+ +| | EventLength 13 : 4 | length of event, including header and data | +| +----------------------------+---------------------------------------------------------------------+ +| | NextPosition 17 : 4 | offset of next event from the start of file | ++=====================================+=====================================================================+ +| event | BinlogVersion 21 : 2 | binlog version | +| data +----------------------------+---------------------------------------------------------------------+ +| | ServerVersion 23 : 8 | write node version | +| +----------------------------+---------------------------------------------------------------------+ +| | CommitID 31 : 8 | commit id of the programe in git | +| +----------------------------+---------------------------------------------------------------------+ +| | HeaderLength 39 : 1 | header length of other event | +| +----------------------------+---------------------------------------------------------------------+ +| | CollectionID 40 : 8 | collection id | +| +----------------------------+---------------------------------------------------------------------+ +| | PartitionID 48 : 8 | partition id (schema column does not need) | +| +----------------------------+---------------------------------------------------------------------+ +| | SegmentID 56 : 8 | segment id (schema column does not need) | +| +----------------------------+---------------------------------------------------------------------+ +| | StartTimestamp 64 : 1 | minimum timestamp allocated by master of all events in this file | +| +----------------------------+---------------------------------------------------------------------+ +| | EndTimestamp 65 : 1 | maximum timestamp allocated by master of all events in this file | +| +----------------------------+---------------------------------------------------------------------+ +| | PayloadDataType 66 : 1 | data type of payload | +| +----------------------------+---------------------------------------------------------------------+ +| | PostHeaderLength 67 : n | header lengths for all event types | ++=====================================+=====================================================================| ``` - ### Type code ``` @@ -88,12 +85,11 @@ DROP_PARTITION_EVENT DESCRIPTOR_EVENT must appear in all column files and always be the first event. -INSERT_EVENT 可以出现在除DDL binlog文件外的其他列的binlog +INSERT_EVENT 可以出现在除 DDL binlog 文件外的其他列的 binlog -DELETE_EVENT 只能用于primary key 的binlog文件(目前只有按照primary key删除) - -CREATE_COLLECTION_EVENT、DROP_COLLECTION_EVENT、CREATE_PARTITION_EVENT、DROP_PARTITION_EVENT 只出现在DDL binlog文件 +DELETE_EVENT 只能用于 primary key 的 binlog 文件(目前只有按照 primary key 删除) +CREATE_COLLECTION_EVENT、DROP_COLLECTION_EVENT、CREATE_PARTITION_EVENT、DROP_PARTITION_EVENT 只出现在 DDL binlog 文件 ### Event data part @@ -102,28 +98,21 @@ CREATE_COLLECTION_EVENT、DROP_COLLECTION_EVENT、CREATE_PARTITION_EVENT、DROP_ event data part INSERT_EVENT: -+================================================+ -| event | fixed | start_timestamp x : 8 | min timestamp in this event -| data | part +------------------------------+ -| | | end_timestamp x+8 : 8 | max timestamp in this event -| | +------------------------------+ -| | | reserved x+16 : y-x-16 | reserved part -| +--------+------------------------------+ -| |variable| parquet payloI ad | payload in parquet format -| |part | | -+================================================+ - -other events is similar with INSERT_EVENT - ++================================================+==========================================================+ +| event | fixed | StartTimestamp x : 8 | min timestamp in this event | +| data | part +------------------------------+----------------------------------------------------------+ +| | | EndTimestamp x+8 : 8 | max timestamp in this event | +| | +------------------------------+----------------------------------------------------------+ +| | | reserved x+16 : y | reserved part | +| +--------+------------------------------+----------------------------------------------------------+ +| |variable| parquet payload | payload in parquet format | +| |part | | | ++================================================+==========================================================+ +other events are similar with INSERT_EVENT ``` - - - - - ### Example Schema @@ -212,12 +201,4 @@ CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values, int GetPayloadLengthFromReader(CPayloadReader payloadReader); CStatus ReleasePayloadReader(CPayloadReader payloadReader); - ``` - - - - - - - diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index b954ab3445..cfae8f8b69 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -35,8 +35,7 @@ func TestInsertBinlog(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) e2, err := w.NextInsertEventWriter() assert.Nil(t, err) @@ -46,11 +45,9 @@ func TestInsertBinlog(t *testing.T) { assert.NotNil(t, err) err = e2.AddDataToPayload([]int64{10, 11, 12}) assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) + e2.SetEventTimestamp(300, 400) - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) _, err = w.GetBuffer() assert.NotNil(t, err) @@ -294,8 +291,7 @@ func TestDeleteBinlog(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) e2, err := w.NextDeleteEventWriter() assert.Nil(t, err) @@ -305,11 +301,9 @@ func TestDeleteBinlog(t *testing.T) { assert.NotNil(t, err) err = e2.AddDataToPayload([]int64{10, 11, 12}) assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) + e2.SetEventTimestamp(300, 400) - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) _, err = w.GetBuffer() assert.NotNil(t, err) @@ -553,8 +547,7 @@ func TestDDLBinlog1(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) e2, err := w.NextDropCollectionEventWriter() assert.Nil(t, err) @@ -564,11 +557,9 @@ func TestDDLBinlog1(t *testing.T) { assert.NotNil(t, err) err = e2.AddDataToPayload([]int64{10, 11, 12}) assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) + e2.SetEventTimestamp(300, 400) - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) _, err = w.GetBuffer() assert.NotNil(t, err) @@ -812,8 +803,7 @@ func TestDDLBinlog2(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) e2, err := w.NextDropPartitionEventWriter() assert.Nil(t, err) @@ -823,11 +813,9 @@ func TestDDLBinlog2(t *testing.T) { assert.NotNil(t, err) err = e2.AddDataToPayload([]int64{10, 11, 12}) assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) + e2.SetEventTimestamp(300, 400) - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) _, err = w.GetBuffer() assert.NotNil(t, err) @@ -1090,8 +1078,7 @@ func TestNewBinlogReaderError(t *testing.T) { w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) e1, err := w.NextInsertEventWriter() assert.Nil(t, err) @@ -1101,8 +1088,7 @@ func TestNewBinlogReaderError(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) _, err = w.GetBuffer() assert.NotNil(t, err) @@ -1132,13 +1118,13 @@ func TestNewBinlogWriterTsError(t *testing.T) { err = w.Close() assert.NotNil(t, err) - w.SetStartTimeStamp(1000) + w.SetEventTimeStamp(1000, 0) _, err = w.GetBuffer() assert.NotNil(t, err) err = w.Close() assert.NotNil(t, err) - w.SetEndTimeStamp(2000) + w.SetEventTimeStamp(1000, 2000) _, err = w.GetBuffer() assert.NotNil(t, err) err = w.Close() @@ -1146,7 +1132,6 @@ func TestNewBinlogWriterTsError(t *testing.T) { _, err = w.GetBuffer() assert.Nil(t, err) - } func TestInsertBinlogWriterCloseError(t *testing.T) { @@ -1155,17 +1140,14 @@ func TestInsertBinlogWriterCloseError(t *testing.T) { assert.Nil(t, err) err = e1.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) - insertWriter.SetStartTimeStamp(1000) - insertWriter.SetEndTimeStamp(2000) + e1.SetEventTimestamp(100, 200) + insertWriter.SetEventTimeStamp(1000, 2000) err = insertWriter.Close() assert.Nil(t, err) assert.NotNil(t, insertWriter.buffer) insertEventWriter, err := insertWriter.NextInsertEventWriter() assert.Nil(t, insertEventWriter) assert.NotNil(t, err) - } func TestDeleteBinlogWriteCloseError(t *testing.T) { @@ -1174,10 +1156,8 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) { assert.Nil(t, err) err = e1.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) - deleteWriter.SetStartTimeStamp(1000) - deleteWriter.SetEndTimeStamp(2000) + e1.SetEventTimestamp(100, 200) + deleteWriter.SetEventTimeStamp(1000, 2000) err = deleteWriter.Close() assert.Nil(t, err) assert.NotNil(t, deleteWriter.buffer) @@ -1192,11 +1172,9 @@ func TestDDBinlogWriteCloseError(t *testing.T) { assert.Nil(t, err) err = e1.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) + e1.SetEventTimestamp(100, 200) - ddBinlogWriter.SetStartTimeStamp(1000) - ddBinlogWriter.SetEndTimeStamp(2000) + ddBinlogWriter.SetEventTimeStamp(1000, 2000) err = ddBinlogWriter.Close() assert.Nil(t, err) assert.NotNil(t, ddBinlogWriter.buffer) @@ -1216,7 +1194,6 @@ func TestDDBinlogWriteCloseError(t *testing.T) { dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter() assert.Nil(t, dropPartitionEventWriter) assert.NotNil(t, err) - } type testEvent struct { @@ -1276,8 +1253,7 @@ func TestWriterListError(t *testing.T) { insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) errorEvent := &testEvent{} insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent) - insertWriter.SetStartTimeStamp(1000) - insertWriter.SetEndTimeStamp(2000) + insertWriter.SetEventTimeStamp(1000, 2000) errorEvent.releasePayloadError = true err := insertWriter.Close() assert.NotNil(t, err) @@ -1297,5 +1273,4 @@ func TestWriterListError(t *testing.T) { errorEvent.finishError = true err = insertWriter.Close() assert.NotNil(t, err) - } diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 217a0d9452..42718902c5 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -14,8 +14,7 @@ package storage import ( "bytes" "encoding/binary" - - "errors" + "fmt" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -79,7 +78,7 @@ func (writer *baseBinlogWriter) GetBinlogType() BinlogType { // GetBuffer get binlog buffer. Return nil if binlog is not finished yet. func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) { if writer.buffer == nil { - return nil, errors.New("please close binlog before get buffer") + return nil, fmt.Errorf("please close binlog before get buffer") } return writer.buffer.Bytes(), nil } @@ -89,22 +88,21 @@ func (writer *baseBinlogWriter) Close() error { if writer.buffer != nil { return nil } - if writer.StartTimestamp == 0 { - return errors.New("hasn't set start time stamp") - } - if writer.EndTimestamp == 0 { - return errors.New("hasn't set end time stamp") + if writer.StartTimestamp == 0 || writer.EndTimestamp == 0 { + return fmt.Errorf("invalid start/end timestamp") } - var offset int32 + var offset int32 = 0 writer.buffer = new(bytes.Buffer) if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil { return err } + offset += int32(binary.Size(MagicNumber)) if err := writer.descriptorEvent.Write(writer.buffer); err != nil { return err } - offset = writer.descriptorEvent.GetMemoryUsageInBytes() + int32(binary.Size(MagicNumber)) + offset += writer.descriptorEvent.GetMemoryUsageInBytes() + writer.length = 0 for _, w := range writer.eventWriters { w.SetOffset(offset) @@ -137,7 +135,7 @@ type InsertBinlogWriter struct { func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newInsertEventWriter(writer.PayloadDataType) if err != nil { @@ -153,7 +151,7 @@ type DeleteBinlogWriter struct { func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newDeleteEventWriter(writer.PayloadDataType) if err != nil { @@ -169,7 +167,7 @@ type DDLBinlogWriter struct { func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newCreateCollectionEventWriter(writer.PayloadDataType) if err != nil { @@ -181,7 +179,7 @@ func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollect func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newDropCollectionEventWriter(writer.PayloadDataType) if err != nil { @@ -193,7 +191,7 @@ func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionE func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newCreatePartitionEventWriter(writer.PayloadDataType) if err != nil { @@ -205,7 +203,7 @@ func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitio func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) { if writer.isClosed() { - return nil, errors.New("binlog has closed") + return nil, fmt.Errorf("binlog has closed") } event, err := newDropPartitionEventWriter(writer.PayloadDataType) if err != nil { @@ -232,6 +230,7 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID }, } } + func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter { descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType @@ -246,6 +245,7 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *Dele }, } } + func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter { descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index d377b8395a..a9463ff144 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -22,8 +22,7 @@ import ( func TestBinlogWriterReader(t *testing.T) { binlogWriter := NewInsertBinlogWriter(schemapb.DataType_Int32, 10, 20, 30, 40) - binlogWriter.SetStartTimeStamp(1000) - binlogWriter.SetEndTimeStamp(2000) + binlogWriter.SetEventTimeStamp(1000, 2000) defer binlogWriter.Close() eventWriter, err := binlogWriter.NextInsertEventWriter() assert.Nil(t, err) @@ -31,8 +30,7 @@ func TestBinlogWriterReader(t *testing.T) { assert.Nil(t, err) _, err = binlogWriter.GetBuffer() assert.NotNil(t, err) - eventWriter.SetStartTimestamp(1000) - eventWriter.SetEndTimestamp(2000) + eventWriter.SetEventTimestamp(1000, 2000) nums, err := binlogWriter.GetRowNums() assert.Nil(t, err) assert.EqualValues(t, 3, nums) diff --git a/internal/storage/cwrapper/ParquetWrapper.cpp b/internal/storage/cwrapper/ParquetWrapper.cpp index 57fa8ab2d7..2d30ec8e0e 100644 --- a/internal/storage/cwrapper/ParquetWrapper.cpp +++ b/internal/storage/cwrapper/ParquetWrapper.cpp @@ -20,7 +20,8 @@ static const char *ErrorMsg(const std::string &msg) { return ret; } -extern "C" CPayloadWriter NewPayloadWriter(int columnType) { +extern "C" +CPayloadWriter NewPayloadWriter(int columnType) { auto p = new wrapper::PayloadWriter; p->builder = nullptr; p->schema = nullptr; @@ -125,30 +126,43 @@ CStatus AddValuesToPayload(CPayloadWriter payloadWriter, DT *values, int length) return st; } -extern "C" CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length) { +extern "C" +CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length) { +extern "C" +CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length) { + +extern "C" +CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length) { + +extern "C" +CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length) { + +extern "C" +CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length) { + +extern "C" +CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length) { + +extern "C" +CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length) { return AddValuesToPayload(payloadWriter, values, length); } -extern "C" CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size) { +extern "C" +CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -180,7 +194,8 @@ extern "C" CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cst return st; } -extern "C" CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length) { +extern "C" +CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -227,7 +242,8 @@ extern "C" CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_ return st; } -extern "C" CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length) { +extern "C" +CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -270,7 +286,8 @@ extern "C" CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float * return st; } -extern "C" CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { +extern "C" +CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -320,7 +337,8 @@ int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) { return p->rows; } -extern "C" CStatus ReleasePayloadWriter(CPayloadWriter handler) { +extern "C" +CStatus ReleasePayloadWriter(CPayloadWriter handler) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -329,7 +347,8 @@ extern "C" CStatus ReleasePayloadWriter(CPayloadWriter handler) { return st; } -extern "C" CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size) { +extern "C" +CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size) { auto p = new wrapper::PayloadReader; p->bValues = nullptr; p->input = std::make_shared(buffer, buf_size); @@ -369,7 +388,8 @@ extern "C" CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int6 return reinterpret_cast(p); } -extern "C" CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length) { +extern "C" +CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -409,25 +429,38 @@ CStatus GetValuesFromPayload(CPayloadReader payloadReader, DT **values, int *len return st; } -extern "C" CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length) { +extern "C" +CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length) { + +extern "C" +CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length) { + +extern "C" +CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length) { + +extern "C" +CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length) { + +extern "C" +CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length) { + +extern "C" +CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length) { return GetValuesFromPayload(payloadReader, values, length); } -extern "C" CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size) { + +extern "C" +CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -448,10 +481,9 @@ extern "C" CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx *str_size = length; return st; } -extern "C" CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, - uint8_t **values, - int *dimension, - int *length) { + +extern "C" +CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t **values, int *dimension, int *length) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -467,10 +499,9 @@ extern "C" CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, *values = (uint8_t *) array->raw_values(); return st; } -extern "C" CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, - float **values, - int *dimension, - int *length) { + +extern "C" +CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values, int *dimension, int *length) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; @@ -487,13 +518,15 @@ extern "C" CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, return st; } -extern "C" int GetPayloadLengthFromReader(CPayloadReader payloadReader) { +extern "C" +int GetPayloadLengthFromReader(CPayloadReader payloadReader) { auto p = reinterpret_cast(payloadReader); if (p->array == nullptr) return 0; return p->array->length(); } -extern "C" CStatus ReleasePayloadReader(CPayloadReader payloadReader) { +extern "C" +CStatus ReleasePayloadReader(CPayloadReader payloadReader) { CStatus st; st.error_code = static_cast(ErrorCode::SUCCESS); st.error_msg = nullptr; diff --git a/internal/storage/cwrapper/ParquetWrapper.h b/internal/storage/cwrapper/ParquetWrapper.h index b6a3bc40db..d8266dcde4 100644 --- a/internal/storage/cwrapper/ParquetWrapper.h +++ b/internal/storage/cwrapper/ParquetWrapper.h @@ -18,8 +18,6 @@ extern "C" { #include #include -typedef void *CPayloadWriter; - typedef struct CBuffer { char *data; int length; @@ -30,6 +28,8 @@ typedef struct CStatus { const char *error_msg; } CStatus; +//============= payload writer ====================== +typedef void *CPayloadWriter; CPayloadWriter NewPayloadWriter(int columnType); CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length); CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length); @@ -48,7 +48,6 @@ int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter); CStatus ReleasePayloadWriter(CPayloadWriter handler); //============= payload reader ====================== - typedef void *CPayloadReader; CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size); CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length); diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index ea023d9e2c..bad980cc2a 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -13,7 +13,6 @@ package storage import ( "encoding/json" - "errors" "fmt" "sort" "strconv" @@ -150,7 +149,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique var writer *InsertBinlogWriter timeFieldData, ok := data.Data[rootcoord.TimeStampField] if !ok { - return nil, nil, errors.New("data doesn't contains timestamp field") + return nil, nil, fmt.Errorf("data doesn't contains timestamp field") } ts := timeFieldData.(*Int64FieldData).Data startTs := ts[0] @@ -172,8 +171,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return nil, nil, err } - eventWriter.SetStartTimestamp(typeutil.Timestamp(startTs)) - eventWriter.SetEndTimestamp(typeutil.Timestamp(endTs)) + eventWriter.SetEventTimestamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs)) switch field.DataType { case schemapb.DataType_Bool: err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) @@ -206,8 +204,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { return nil, nil, err } - writer.SetStartTimeStamp(typeutil.Timestamp(startTs)) - writer.SetEndTimeStamp(typeutil.Timestamp(endTs)) + writer.SetEventTimeStamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs)) err = writer.Close() if err != nil { @@ -242,9 +239,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique return blobs, statsBlobs, nil } + func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) { if len(blobs) == 0 { - return -1, -1, nil, errors.New("blobs is empty") + return -1, -1, nil, fmt.Errorf("blobs is empty") } readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } @@ -507,10 +505,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - eventWriter.SetStartTimestamp(ts[0]) - eventWriter.SetEndTimestamp(ts[len(ts)-1]) - writer.SetStartTimeStamp(ts[0]) - writer.SetEndTimeStamp(ts[len(ts)-1]) + eventWriter.SetEventTimestamp(ts[0], ts[len(ts)-1]) + writer.SetEventTimeStamp(ts[0], ts[len(ts)-1]) err = writer.Close() if err != nil { return nil, err @@ -537,45 +533,40 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - eventWriter.SetStartTimestamp(ts[pos]) - eventWriter.SetEndTimestamp(ts[pos]) + eventWriter.SetEventTimestamp(ts[pos], ts[pos]) case DropCollectionEventType: eventWriter, err := writer.NextDropCollectionEventWriter() if err != nil { return nil, err } err = eventWriter.AddOneStringToPayload(req) - eventWriter.SetStartTimestamp(ts[pos]) - eventWriter.SetEndTimestamp(ts[pos]) if err != nil { return nil, err } + eventWriter.SetEventTimestamp(ts[pos], ts[pos]) case CreatePartitionEventType: eventWriter, err := writer.NextCreatePartitionEventWriter() if err != nil { return nil, err } err = eventWriter.AddOneStringToPayload(req) - eventWriter.SetStartTimestamp(ts[pos]) - eventWriter.SetEndTimestamp(ts[pos]) if err != nil { return nil, err } + eventWriter.SetEventTimestamp(ts[pos], ts[pos]) case DropPartitionEventType: eventWriter, err := writer.NextDropPartitionEventWriter() if err != nil { return nil, err } err = eventWriter.AddOneStringToPayload(req) - eventWriter.SetStartTimestamp(ts[pos]) - eventWriter.SetEndTimestamp(ts[pos]) if err != nil { return nil, err } + eventWriter.SetEventTimestamp(ts[pos], ts[pos]) } } - writer.SetStartTimeStamp(ts[0]) - writer.SetEndTimeStamp(ts[len(ts)-1]) + writer.SetEventTimeStamp(ts[0], ts[len(ts)-1]) err = writer.Close() if err != nil { return nil, err @@ -590,12 +581,11 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ }) return blobs, nil - } func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) { if len(blobs) == 0 { - return nil, nil, errors.New("blobs is empty") + return nil, nil, fmt.Errorf("blobs is empty") } readerClose := func(reader *BinlogReader) func() error { return func() error { return reader.Close() } @@ -707,7 +697,7 @@ func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]st break } if file == nil { - return nil, nil, "", -1, errors.New("can not find params blob") + return nil, nil, "", -1, fmt.Errorf("can not find params blob") } info := struct { Params map[string]string diff --git a/internal/storage/data_sorter.go b/internal/storage/data_sorter.go index 4e26848d19..be5e0c2a00 100644 --- a/internal/storage/data_sorter.go +++ b/internal/storage/data_sorter.go @@ -13,6 +13,7 @@ package storage import ( "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/rootcoord" ) type DataSorter struct { @@ -20,17 +21,15 @@ type DataSorter struct { InsertData *InsertData } -func (ds *DataSorter) getIDField() FieldData { - for _, field := range ds.InsertCodec.Schema.Schema.Fields { - if field.FieldID == 0 { - return ds.InsertData.Data[field.FieldID] - } +func (ds *DataSorter) getRowIDFieldData() FieldData { + if data, ok := ds.InsertData.Data[rootcoord.RowIDField]; ok { + return data } return nil } func (ds *DataSorter) Len() int { - return len(ds.getIDField().(*Int64FieldData).Data) + return len(ds.getRowIDFieldData().(*Int64FieldData).Data) } func (ds *DataSorter) Swap(i, j int) { @@ -81,5 +80,6 @@ func (ds *DataSorter) Swap(i, j int) { } func (ds *DataSorter) Less(i, j int) bool { - return ds.getIDField().(*Int64FieldData).Data[i] < ds.getIDField().(*Int64FieldData).Data[j] + ids := ds.getRowIDFieldData().(*Int64FieldData).Data + return ids[i] < ids[j] } diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index 239888e5ce..92eaedaf85 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -13,9 +13,8 @@ package storage import ( "encoding/binary" - "io" - "errors" + "io" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -40,23 +39,23 @@ type DescriptorEventDataFixPart struct { PayloadDataType schemapb.DataType } -func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) { - data.StartTimestamp = ts +func (data *descriptorEventData) SetEventTimeStamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } -func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) { - data.EndTimestamp = ts +func (data *descriptorEventData) GetEventDataFixPartSize() int32 { + return int32(binary.Size(data.DescriptorEventDataFixPart)) } func (data *descriptorEventData) GetMemoryUsageInBytes() int32 { - return int32(binary.Size(data.DescriptorEventDataFixPart) + binary.Size(data.PostHeaderLengths)) + return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) } func (data *descriptorEventData) Write(buffer io.Writer) error { if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil { return err } - if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil { return err } @@ -65,15 +64,12 @@ func (data *descriptorEventData) Write(buffer io.Writer) error { func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { event := newDescriptorEventData() - if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil { return nil, err } - if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil { return nil, err } - return event, nil } @@ -89,12 +85,9 @@ type insertEventData struct { EndTimestamp typeutil.Timestamp } -func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *insertEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *insertEventData) GetEventDataFixPartSize() int32 { @@ -116,12 +109,9 @@ type deleteEventData struct { EndTimestamp typeutil.Timestamp } -func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *deleteEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *deleteEventData) GetEventDataFixPartSize() int32 { @@ -143,12 +133,9 @@ type createCollectionEventData struct { EndTimestamp typeutil.Timestamp } -func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *createCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *createCollectionEventData) GetEventDataFixPartSize() int32 { @@ -170,12 +157,9 @@ type dropCollectionEventData struct { EndTimestamp typeutil.Timestamp } -func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *dropCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 { @@ -197,12 +181,9 @@ type createPartitionEventData struct { EndTimestamp typeutil.Timestamp } -func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *createPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *createPartitionEventData) GetEventDataFixPartSize() int32 { @@ -224,12 +205,9 @@ type dropPartitionEventData struct { EndTimestamp typeutil.Timestamp } -func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) { - data.StartTimestamp = timestamp -} - -func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) { - data.EndTimestamp = timestamp +func (data *dropPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) { + data.StartTimestamp = start + data.EndTimestamp = end } func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 { @@ -249,7 +227,7 @@ func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error { func getEventFixPartSize(code EventTypeCode) int32 { switch code { case DescriptorEventType: - return int32(binary.Size(descriptorEventData{}.DescriptorEventDataFixPart)) + return (&descriptorEventData{}).GetEventDataFixPartSize() case InsertEventType: return (&insertEventData{}).GetEventDataFixPartSize() case DeleteEventType: diff --git a/internal/storage/event_reader.go b/internal/storage/event_reader.go index d1a5efe553..bd7df0832e 100644 --- a/internal/storage/event_reader.go +++ b/internal/storage/event_reader.go @@ -13,7 +13,6 @@ package storage import ( "bytes" - "errors" "fmt" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -27,28 +26,21 @@ type EventReader struct { isClosed bool } -func (reader *EventReader) checkClose() error { +func (reader *EventReader) readHeader() error { if reader.isClosed { - return errors.New("event reader is closed") - } - return nil -} - -func (reader *EventReader) readHeader() (*eventHeader, error) { - if err := reader.checkClose(); err != nil { - return nil, err + return fmt.Errorf("event reader is closed") } header, err := readEventHeader(reader.buffer) if err != nil { - return nil, err + return err } reader.eventHeader = *header - return &reader.eventHeader, nil + return nil } -func (reader *EventReader) readData() (eventData, error) { - if err := reader.checkClose(); err != nil { - return nil, err +func (reader *EventReader) readData() error { + if reader.isClosed { + return fmt.Errorf("event reader is closed") } var data eventData var err error @@ -66,15 +58,14 @@ func (reader *EventReader) readData() (eventData, error) { case DropPartitionEventType: data, err = readDropPartitionEventDataFixPart(reader.buffer) default: - return nil, fmt.Errorf("unknown header type code: %d", reader.TypeCode) + return fmt.Errorf("unknown header type code: %d", reader.TypeCode) } - if err != nil { - return nil, err + return err } reader.eventData = data - return reader.eventData, nil + return nil } func (reader *EventReader) Close() error { @@ -84,6 +75,7 @@ func (reader *EventReader) Close() error { } return nil } + func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventReader, error) { reader := &EventReader{ eventHeader: eventHeader{ @@ -93,15 +85,15 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea isClosed: false, } - if _, err := reader.readHeader(); err != nil { + if err := reader.readHeader(); err != nil { + return nil, err + } + if err := reader.readData(); err != nil { return nil, err } - if _, err := reader.readData(); err != nil { - return nil, err - } - - payloadBuffer := buffer.Next(int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize())) + next := int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize()) + payloadBuffer := buffer.Next(next) payloadReader, err := NewPayloadReader(datatype, payloadBuffer) if err != nil { return nil, err diff --git a/internal/storage/event_test.go b/internal/storage/event_test.go index 1b39679836..d685e51b27 100644 --- a/internal/storage/event_test.go +++ b/internal/storage/event_test.go @@ -177,8 +177,7 @@ func TestInsertEvent(t *testing.T) { ) { w, err := newInsertEventWriter(dt) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = ir1(w) assert.Nil(t, err) err = iw(w) @@ -350,8 +349,7 @@ func TestInsertEvent(t *testing.T) { t.Run("insert_string", func(t *testing.T) { w, err := newInsertEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -426,8 +424,7 @@ func TestDeleteEvent(t *testing.T) { ) { w, err := newDeleteEventWriter(dt) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = ir1(w) assert.Nil(t, err) err = iw(w) @@ -599,8 +596,7 @@ func TestDeleteEvent(t *testing.T) { t.Run("delete_string", func(t *testing.T) { w, err := newDeleteEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -675,8 +671,7 @@ func TestCreateCollectionEvent(t *testing.T) { t.Run("create_collection_timestamp", func(t *testing.T) { w, err := newCreateCollectionEventWriter(schemapb.DataType_Int64) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) err = w.AddDataToPayload([]int{4, 5, 6}) @@ -722,8 +717,7 @@ func TestCreateCollectionEvent(t *testing.T) { t.Run("create_collection_string", func(t *testing.T) { w, err := newCreateCollectionEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -798,8 +792,7 @@ func TestDropCollectionEvent(t *testing.T) { t.Run("drop_collection_timestamp", func(t *testing.T) { w, err := newDropCollectionEventWriter(schemapb.DataType_Int64) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) err = w.AddDataToPayload([]int{4, 5, 6}) @@ -845,8 +838,7 @@ func TestDropCollectionEvent(t *testing.T) { t.Run("drop_collection_string", func(t *testing.T) { w, err := newDropCollectionEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -921,8 +913,7 @@ func TestCreatePartitionEvent(t *testing.T) { t.Run("create_partition_timestamp", func(t *testing.T) { w, err := newCreatePartitionEventWriter(schemapb.DataType_Int64) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) err = w.AddDataToPayload([]int{4, 5, 6}) @@ -968,8 +959,7 @@ func TestCreatePartitionEvent(t *testing.T) { t.Run("create_partition_string", func(t *testing.T) { w, err := newCreatePartitionEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -1044,8 +1034,7 @@ func TestDropPartitionEvent(t *testing.T) { t.Run("drop_partition_timestamp", func(t *testing.T) { w, err := newDropPartitionEventWriter(schemapb.DataType_Int64) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload([]int64{1, 2, 3}) assert.Nil(t, err) err = w.AddDataToPayload([]int{4, 5, 6}) @@ -1091,8 +1080,7 @@ func TestDropPartitionEvent(t *testing.T) { t.Run("drop_partition_string", func(t *testing.T) { w, err := newDropPartitionEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.AddOneStringToPayload("567890") @@ -1302,8 +1290,7 @@ func TestEventReaderError(t *testing.T) { func TestEventClose(t *testing.T) { w, err := newInsertEventWriter(schemapb.DataType_String) assert.Nil(t, err) - w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) - w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0)) err = w.AddDataToPayload("1234") assert.Nil(t, err) err = w.Finish() @@ -1324,8 +1311,8 @@ func TestEventClose(t *testing.T) { err = r.Close() assert.Nil(t, err) - _, err = r.readHeader() + err = r.readHeader() assert.NotNil(t, err) - _, err = r.readData() + err = r.readData() assert.NotNil(t, err) } diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 7814c4789f..ee42c9bad5 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -14,9 +14,8 @@ package storage import ( "bytes" "encoding/binary" - "io" - "errors" + "io" "github.com/milvus-io/milvus/internal/proto/schemapb" ) @@ -35,12 +34,19 @@ const ( ) func (code EventTypeCode) String() string { - codes := []string{"DescriptorEventType", "InsertEventType", "DeleteEventType", "CreateCollectionEventType", "DropCollectionEventType", - "CreatePartitionEventType", "DropPartitionEventType"} - if len(codes) < int(code) { - return "" + codes := map[EventTypeCode]string{ + DescriptorEventType: "DescriptorEventType", + InsertEventType: "InsertEventType", + DeleteEventType: "DeleteEventType", + CreateCollectionEventType: "CreateCollectionEventType", + DropCollectionEventType: "DropCollectionEventType", + CreatePartitionEventType: "CreatePartitionEventType", + DropPartitionEventType: "DropPartitionEventType", } - return codes[code] + if eventTypeStr, ok := codes[code]; ok { + return eventTypeStr + } + return "InvalidEventType" } type descriptorEvent struct { @@ -104,8 +110,8 @@ func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) { if err != nil { return -1, err } - return writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + - int32(len(data)), nil + size := writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + int32(len(data)) + return size, nil } func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error { @@ -115,7 +121,6 @@ func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error { if err := writer.writeEventData(buffer); err != nil { return err } - data, err := writer.GetPayloadBufferFromWriter() if err != nil { return err @@ -138,7 +143,6 @@ func (writer *baseEventWriter) Finish() error { } writer.EventLength = eventLength writer.NextPosition = eventLength + writer.offset - } return nil } @@ -229,6 +233,7 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error } header := newEventHeader(DeleteEventType) data := newDeleteEventData() + writer := &deleteEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, @@ -242,6 +247,7 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData return writer, nil } + func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollectionEventWriter, error) { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { return nil, errors.New("incorrect data type") @@ -267,6 +273,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData return writer, nil } + func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEventWriter, error) { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { return nil, errors.New("incorrect data type") @@ -278,6 +285,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv } header := newEventHeader(DropCollectionEventType) data := newDropCollectionEventData() + writer := &dropCollectionEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, @@ -291,6 +299,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData return writer, nil } + func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartitionEventWriter, error) { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { return nil, errors.New("incorrect data type") @@ -316,6 +325,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData return writer, nil } + func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEventWriter, error) { if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 { return nil, errors.New("incorrect data type") @@ -327,6 +337,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven } header := newEventHeader(DropPartitionEventType) data := newDropPartitionEventData() + writer := &dropPartitionEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, diff --git a/internal/storage/event_writer_test.go b/internal/storage/event_writer_test.go index 48cfb025ff..90f289794e 100644 --- a/internal/storage/event_writer_test.go +++ b/internal/storage/event_writer_test.go @@ -66,8 +66,7 @@ func TestEventWriter(t *testing.T) { err = insertEvent.AddInt32ToPayload([]int32{1}) assert.NotNil(t, err) buffer := new(bytes.Buffer) - insertEvent.SetStartTimestamp(100) - insertEvent.SetEndTimestamp(200) + insertEvent.SetEventTimestamp(100, 200) err = insertEvent.Write(buffer) assert.Nil(t, err) length, err = insertEvent.GetMemoryUsageInBytes() diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 8c18a245a4..3a43d21802 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -20,9 +20,8 @@ package storage */ import "C" import ( - "unsafe" - "errors" + "unsafe" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -63,6 +62,7 @@ type PayloadReaderInterface interface { ReleasePayloadReader() error Close() error } + type PayloadWriter struct { payloadWriterPtr C.CPayloadWriter colType schemapb.DataType @@ -91,49 +91,42 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { return errors.New("incorrect data type") } return w.AddBoolToPayload(val) - case schemapb.DataType_Int8: val, ok := msgs.([]int8) if !ok { return errors.New("incorrect data type") } return w.AddInt8ToPayload(val) - case schemapb.DataType_Int16: val, ok := msgs.([]int16) if !ok { return errors.New("incorrect data type") } return w.AddInt16ToPayload(val) - case schemapb.DataType_Int32: val, ok := msgs.([]int32) if !ok { return errors.New("incorrect data type") } return w.AddInt32ToPayload(val) - case schemapb.DataType_Int64: val, ok := msgs.([]int64) if !ok { return errors.New("incorrect data type") } return w.AddInt64ToPayload(val) - case schemapb.DataType_Float: val, ok := msgs.([]float32) if !ok { return errors.New("incorrect data type") } return w.AddFloatToPayload(val) - case schemapb.DataType_Double: val, ok := msgs.([]float64) if !ok { return errors.New("incorrect data type") } return w.AddDoubleToPayload(val) - case schemapb.DataType_String: val, ok := msgs.(string) if !ok { @@ -151,7 +144,6 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { return errors.New("incorrect data type") } return w.AddBinaryVectorToPayload(val, dim[0]) - case schemapb.DataType_FloatVector: val, ok := msgs.([]float32) if !ok { @@ -161,10 +153,8 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { default: return errors.New("incorrect datatype") } - default: return errors.New("incorrect input numbers") - } } @@ -334,7 +324,6 @@ func (w *PayloadWriter) AddBinaryVectorToPayload(binVec []byte, dim int) error { if length <= 0 { return errors.New("can't add empty binVec into payload") } - if dim <= 0 { return errors.New("dimension should be greater than 0") } @@ -359,16 +348,15 @@ func (w *PayloadWriter) AddFloatVectorToPayload(floatVec []float32, dim int) err if length <= 0 { return errors.New("can't add empty floatVec into payload") } - if dim <= 0 { return errors.New("dimension should be greater than 0") } - cBinVec := (*C.float)(&floatVec[0]) + cVec := (*C.float)(&floatVec[0]) cDim := C.int(dim) cLength := C.int(length / dim) - st := C.AddFloatVectorToPayload(w.payloadWriterPtr, cBinVec, cDim, cLength) + st := C.AddFloatVectorToPayload(w.payloadWriterPtr, cVec, cDim, cLength) errCode := commonpb.ErrorCode(st.error_code) if errCode != commonpb.ErrorCode_Success { msg := C.GoString(st.error_msg) @@ -446,45 +434,37 @@ func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) val, err := r.GetOneStringFromPayload(idx[0]) return val, 0, err default: - return nil, 0, errors.New("Unknown type") + return nil, 0, errors.New("unknown type") } case 0: switch r.colType { case schemapb.DataType_Bool: val, err := r.GetBoolFromPayload() return val, 0, err - case schemapb.DataType_Int8: val, err := r.GetInt8FromPayload() return val, 0, err - case schemapb.DataType_Int16: val, err := r.GetInt16FromPayload() return val, 0, err - case schemapb.DataType_Int32: val, err := r.GetInt32FromPayload() return val, 0, err - case schemapb.DataType_Int64: val, err := r.GetInt64FromPayload() return val, 0, err - case schemapb.DataType_Float: val, err := r.GetFloatFromPayload() return val, 0, err - case schemapb.DataType_Double: val, err := r.GetDoubleFromPayload() return val, 0, err - case schemapb.DataType_BinaryVector: return r.GetBinaryVectorFromPayload() - case schemapb.DataType_FloatVector: return r.GetFloatVectorFromPayload() default: - return nil, 0, errors.New("Unknown type") + return nil, 0, errors.New("unknown type") } default: return nil, 0, errors.New("incorrect number of index") diff --git a/internal/storage/payload_test.go b/internal/storage/payload_test.go index 14f91ca004..ce33162855 100644 --- a/internal/storage/payload_test.go +++ b/internal/storage/payload_test.go @@ -570,7 +570,7 @@ func TestPayload_ReaderandWriter(t *testing.T) { err = w.AddDoubleToPayload([]float64{0.0}) assert.NotNil(t, err) }) - t.Run("TestAddStringAfterFinish", func(t *testing.T) { + t.Run("TestAddOneStringAfterFinish", func(t *testing.T) { w, err := NewPayloadWriter(schemapb.DataType_String) require.Nil(t, err) require.NotNil(t, w) @@ -819,7 +819,7 @@ func TestPayload_ReaderandWriter(t *testing.T) { _, err = r.GetDoubleFromPayload() assert.NotNil(t, err) }) - t.Run("TestGetStringError", func(t *testing.T) { + t.Run("TestGetOneStringError", func(t *testing.T) { w, err := NewPayloadWriter(schemapb.DataType_Bool) require.Nil(t, err) require.NotNil(t, w) diff --git a/internal/storage/print_binglog_test.go b/internal/storage/print_binlog_test.go similarity index 96% rename from internal/storage/print_binglog_test.go rename to internal/storage/print_binlog_test.go index bf5b07d93f..800913c431 100644 --- a/internal/storage/print_binglog_test.go +++ b/internal/storage/print_binlog_test.go @@ -39,8 +39,7 @@ func TestPrintBinlogFilesInt64(t *testing.T) { assert.NotNil(t, err) err = e1.AddDataToPayload([]int64{4, 5, 6}) assert.Nil(t, err) - e1.SetStartTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0)) - e1.SetEndTimestamp(tsoutil.ComposeTS(curTS+20*60*1000, 0)) + e1.SetEventTimestamp(tsoutil.ComposeTS(curTS+10*60*1000, 0), tsoutil.ComposeTS(curTS+20*60*1000, 0)) e2, err := w.NextInsertEventWriter() assert.Nil(t, err) @@ -50,11 +49,9 @@ func TestPrintBinlogFilesInt64(t *testing.T) { assert.NotNil(t, err) err = e2.AddDataToPayload([]int64{10, 11, 12}) assert.Nil(t, err) - e2.SetStartTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0)) - e2.SetEndTimestamp(tsoutil.ComposeTS(curTS+40*60*1000, 0)) + e2.SetEventTimestamp(tsoutil.ComposeTS(curTS+30*60*1000, 0), tsoutil.ComposeTS(curTS+40*60*1000, 0)) - w.SetStartTimeStamp(tsoutil.ComposeTS(curTS, 0)) - w.SetEndTimeStamp(tsoutil.ComposeTS(curTS+3600*1000, 0)) + w.SetEventTimeStamp(tsoutil.ComposeTS(curTS, 0), tsoutil.ComposeTS(curTS+3600*1000, 0)) _, err = w.GetBuffer() assert.NotNil(t, err)