From 69794fd32d3bd75f0c5c81473ed5fc2042aecd83 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 9 Sep 2021 15:00:00 +0800 Subject: [PATCH] Refactor insertBufferNode and add unit tests (#7621) Signed-off-by: Congqi Xia --- .../datanode/flow_graph_insert_buffer_node.go | 536 +++++++++--------- .../flow_graph_insert_buffer_node_test.go | 138 ++++- 2 files changed, 403 insertions(+), 271 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index fd3b0becb8..dfe8bd6e45 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -15,6 +15,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "path" "strconv" @@ -217,278 +218,10 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // iMsg is insertMsg // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { - if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { - log.Error("misaligned messages detected") - continue - } - currentSegID := msg.GetSegmentID() - collectionID := msg.GetCollectionID() - - idata, ok := ibNode.insertBuffer.insertData[currentSegID] - if !ok { - idata = &InsertData{ - Data: make(map[UniqueID]storage.FieldData), - } - } - - // 1.1 Get Collection Schema - collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs()) + err := ibNode.bufferInsertMsg(iMsg, msg) if err != nil { - // GOOSE TODO add error handler - log.Error("Get schema wrong:", zap.Error(err)) - continue + log.Warn("msg to buffer failed", zap.Error(err)) } - - // 1.2 Get Fields - var pos int = 0 // Record position of blob - var fieldIDs []int64 - var fieldTypes []schemapb.DataType - for _, field := range collSchema.Fields { - fieldIDs = append(fieldIDs, field.FieldID) - fieldTypes = append(fieldTypes, field.DataType) - } - - for _, field := range collSchema.Fields { - switch field.DataType { - case schemapb.DataType_FloatVector: - var dim int - for _, t := range field.TypeParams { - if t.Key == "dim" { - dim, err = strconv.Atoi(t.Value) - if err != nil { - log.Error("strconv wrong on get dim", zap.Error(err)) - } - break - } - } - if dim <= 0 { - log.Error("invalid dim") - continue - // TODO: add error handling - } - - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]float32, 0), - Dim: dim, - } - } - - fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) - - var offset int - for _, blob := range msg.RowData { - offset = 0 - for j := 0; j < dim; j++ { - var v float32 - buf := bytes.NewBuffer(blob.GetValue()[pos+offset:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.read float32 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - offset += int(unsafe.Sizeof(*(&v))) - } - } - pos += offset - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_BinaryVector: - var dim int - for _, t := range field.TypeParams { - if t.Key == "dim" { - dim, err = strconv.Atoi(t.Value) - if err != nil { - log.Error("strconv wrong") - } - break - } - } - if dim <= 0 { - log.Error("invalid dim") - // TODO: add error handling - } - - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]byte, 0), - Dim: dim, - } - } - fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) - - var offset int - for _, blob := range msg.RowData { - bv := blob.GetValue()[pos : pos+(dim/8)] - fieldData.Data = append(fieldData.Data, bv...) - offset = len(bv) - } - pos += offset - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Bool: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.BoolFieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]bool, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) - var v bool - for _, blob := range msg.RowData { - buf := bytes.NewReader(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read bool wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Int8: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int8FieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]int8, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) - var v int8 - for _, blob := range msg.RowData { - buf := bytes.NewReader(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read int8 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Int16: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int16FieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]int16, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) - var v int16 - for _, blob := range msg.RowData { - buf := bytes.NewReader(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read int16 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Int32: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int32FieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]int32, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) - var v int32 - for _, blob := range msg.RowData { - buf := bytes.NewReader(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read int64 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Int64: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int64FieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]int64, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) - switch field.FieldID { - case 0: // rowIDs - fieldData.Data = append(fieldData.Data, msg.RowIDs...) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - case 1: // Timestamps - for _, ts := range msg.Timestamps { - fieldData.Data = append(fieldData.Data, int64(ts)) - } - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - default: - var v int64 - for _, blob := range msg.RowData { - buf := bytes.NewBuffer(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read int64 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - } - - case schemapb.DataType_Float: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.FloatFieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]float32, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) - var v float32 - for _, blob := range msg.RowData { - buf := bytes.NewBuffer(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read float32 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - - case schemapb.DataType_Double: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.DoubleFieldData{ - NumRows: make([]int64, 0, 1), - Data: make([]float64, 0), - } - } - - fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) - var v float64 - for _, blob := range msg.RowData { - buf := bytes.NewBuffer(blob.GetValue()[pos:]) - if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { - log.Error("binary.Read float64 wrong", zap.Error(err)) - } - fieldData.Data = append(fieldData.Data, v) - } - - pos += int(unsafe.Sizeof(*(&v))) - fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) - } - } - - // 1.3 store in buffer - ibNode.insertBuffer.insertData[currentSegID] = idata - - // store current endPositions as Segment->EndPostion - ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0]) - // update segment pk filter - ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) } if len(iMsg.insertMessages) > 0 { @@ -627,6 +360,269 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { return nil } +// bufferInsertMsg put InsertMsg into buffer +// 1.1 fetch related schema from replica +// 1.2 Get buffer data and put data into each field buffer +// 1.3 Put back into buffer +// 1.4 Update related statistics +func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.InsertMsg) error { + if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { + return errors.New("misaligned messages detected") + } + currentSegID := msg.GetSegmentID() + collectionID := msg.GetCollectionID() + + idata, ok := ibNode.insertBuffer.insertData[currentSegID] + if !ok { + idata = &InsertData{ + Data: make(map[UniqueID]storage.FieldData), + } + } + + // 1.1 Get Collection Schema + collSchema, err := ibNode.replica.getCollectionSchema(collectionID, msg.EndTs()) + if err != nil { + // GOOSE TODO add error handler + log.Error("Get schema wrong:", zap.Error(err)) + return err + } + + // 1.2 Get Fields + var pos int = 0 // Record position of blob + var fieldIDs []int64 + var fieldTypes []schemapb.DataType + for _, field := range collSchema.Fields { + fieldIDs = append(fieldIDs, field.FieldID) + fieldTypes = append(fieldTypes, field.DataType) + } + + for _, field := range collSchema.Fields { + switch field.DataType { + case schemapb.DataType_FloatVector: + var dim int + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err = strconv.Atoi(t.Value) + if err != nil { + log.Error("strconv wrong on get dim", zap.Error(err)) + } + break + } + } + if dim <= 0 { + log.Error("invalid dim") + continue + // TODO: add error handling + } + + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]float32, 0), + Dim: dim, + } + } + + fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) + + var offset int + for _, blob := range msg.RowData { + offset = 0 + for j := 0; j < dim; j++ { + var v float32 + readBinary(blob.GetValue()[pos+offset:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + offset += int(unsafe.Sizeof(*(&v))) + } + } + pos += offset + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_BinaryVector: + var dim int + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err = strconv.Atoi(t.Value) + if err != nil { + log.Error("strconv wrong") + } + break + } + } + if dim <= 0 { + log.Error("invalid dim") + // TODO: add error handling + } + + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]byte, 0), + Dim: dim, + } + } + fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) + + var offset int + for _, blob := range msg.RowData { + bv := blob.GetValue()[pos : pos+(dim/8)] + fieldData.Data = append(fieldData.Data, bv...) + offset = len(bv) + } + pos += offset + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Bool: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BoolFieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]bool, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) + var v bool + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Int8: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int8FieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]int8, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) + var v int8 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Int16: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int16FieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]int16, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) + var v int16 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Int32: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int32FieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]int32, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) + var v int32 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Int64: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int64FieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]int64, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) + switch field.FieldID { + case 0: // rowIDs + fieldData.Data = append(fieldData.Data, msg.RowIDs...) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + case 1: // Timestamps + for _, ts := range msg.Timestamps { + fieldData.Data = append(fieldData.Data, int64(ts)) + } + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + default: + var v int64 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + } + + case schemapb.DataType_Float: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatFieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]float32, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) + var v float32 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + + case schemapb.DataType_Double: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.DoubleFieldData{ + NumRows: make([]int64, 0, 1), + Data: make([]float64, 0), + } + } + + fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) + var v float64 + for _, blob := range msg.RowData { + readBinary(blob.GetValue()[pos:], &v, field.DataType) + fieldData.Data = append(fieldData.Data, v) + } + + pos += int(unsafe.Sizeof(*(&v))) + fieldData.NumRows = append(fieldData.NumRows, int64(len(msg.RowData))) + } + } + + // 1.3 store in buffer + ibNode.insertBuffer.insertData[currentSegID] = idata + + // store current endPositions as Segment->EndPostion + ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0]) + // update segment pk filter + ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) + return nil +} + +func readBinary(data []byte, receiver interface{}, dataType schemapb.DataType) { + buf := bytes.NewReader(data) + err := binary.Read(buf, binary.LittleEndian, receiver) + if err != nil { + log.Error("binary.Read failed", zap.Any("data type", dataType), zap.Error(err)) + } +} + func flushSegment( collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID, diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 4c03bbddeb..4832f364ec 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -32,8 +32,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -51,7 +53,7 @@ func (f *CDFMsFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, e return f.Factory.NewMsgStream(ctx) } -func TestFLowGraphInsertBufferNodeCreate(t *testing.T) { +func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -560,3 +562,137 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { }) } + +// CompactedRootCoord has meta info compacted at ts +type CompactedRootCoord struct { + types.RootCoord + compactTs Timestamp +} + +func (m *CompactedRootCoord) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + if in.GetTimeStamp() <= m.compactTs { + return &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "meta compacted", + }, + }, nil + } + return m.RootCoord.DescribeCollection(ctx, in) +} + +func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate" + + testPath := "/test/datanode/root/meta" + err := clearEtcd(testPath) + require.NoError(t, err) + Params.MetaRootPath = testPath + + Factory := &MetaFactory{} + collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") + + rcf := &RootCoordFactory{} + mockRootCoord := &CompactedRootCoord{ + RootCoord: rcf, + compactTs: 100, + } + + replica := newReplica(mockRootCoord, collMeta.ID) + + err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) + require.NoError(t, err) + + msFactory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + saveBinlog := func(fu *segmentFlushUnit) error { + t.Log(fu) + return nil + } + + flushChan := make(chan *flushMsg, 100) + iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + require.NoError(t, err) + + meta, err := iBNode.getCollMetabySegID(1, 101) + assert.Nil(t, err) + assert.Equal(t, collMeta.ID, meta.ID) + + _, err = iBNode.getCollMetabySegID(2, 101) + assert.NotNil(t, err) + + meta, err = iBNode.getCollMetabySegID(1, 99) + assert.NotNil(t, err) +} + +func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + insertChannelName := "datanode-01-test-flowgraphinsertbuffernode-operate" + + testPath := "/test/datanode/root/meta" + err := clearEtcd(testPath) + require.NoError(t, err) + Params.MetaRootPath = testPath + + Factory := &MetaFactory{} + collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") + + rcf := &RootCoordFactory{} + mockRootCoord := &CompactedRootCoord{ + RootCoord: rcf, + compactTs: 100, + } + + replica := newReplica(mockRootCoord, collMeta.ID) + + err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) + require.NoError(t, err) + + msFactory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + saveBinlog := func(fu *segmentFlushUnit) error { + t.Log(fu) + return nil + } + + flushChan := make(chan *flushMsg, 100) + iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + require.NoError(t, err) + + inMsg := genInsertMsg(insertChannelName) + for _, msg := range inMsg.insertMessages { + msg.EndTimestamp = 101 // ts valid + err = iBNode.bufferInsertMsg(&inMsg, msg) + assert.Nil(t, err) + } + + for _, msg := range inMsg.insertMessages { + msg.EndTimestamp = 99 // ts invalid + err = iBNode.bufferInsertMsg(&inMsg, msg) + assert.NotNil(t, err) + } + + for _, msg := range inMsg.insertMessages { + msg.EndTimestamp = 101 // ts valid + msg.RowIDs = []int64{} //misaligned data + err = iBNode.bufferInsertMsg(&inMsg, msg) + assert.NotNil(t, err) + } +}