From 585d3f983115a464254d7432bda0b57471a6800a Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 11 Dec 2020 17:20:14 +0800 Subject: [PATCH] Refactor param table, and add ddNode Signed-off-by: bigsheeper --- configs/advanced/write_node.yaml | 14 +- internal/querynode/flow_graph_dd_node.go | 5 - .../querynode/flow_graph_filter_dm_node.go | 2 +- internal/querynode/flow_graph_message.go | 5 + internal/querynode/query_node_test.go | 2 +- internal/querynode/search_service.go | 6 +- internal/storage/binlog_reader.go | 33 +- internal/storage/binlog_test.go | 808 ------------------ internal/storage/binlog_writer.go | 6 +- internal/storage/binlog_writer_test.go | 1 + internal/storage/data_codec.go | 4 +- internal/writenode/data_sync_service.go | 46 +- internal/writenode/data_sync_service_test.go | 17 +- internal/writenode/dd_buffer.go | 47 + internal/writenode/flow_graph_dd_node.go | 176 ++++ .../writenode/flow_graph_filter_dm_node.go | 64 +- .../flow_graph_insert_buffer_node.go | 13 +- internal/writenode/flow_graph_message.go | 54 +- .../flow_graph_msg_stream_input_node.go | 41 +- .../writenode/flow_graph_service_time_node.go | 46 - internal/writenode/param_table.go | 237 ++--- internal/writenode/param_table_test.go | 67 +- internal/writenode/write_node_test.go | 32 + 23 files changed, 578 insertions(+), 1148 deletions(-) create mode 100644 internal/writenode/dd_buffer.go create mode 100644 internal/writenode/flow_graph_dd_node.go delete mode 100644 internal/writenode/flow_graph_service_time_node.go create mode 100644 internal/writenode/write_node_test.go diff --git a/configs/advanced/write_node.yaml b/configs/advanced/write_node.yaml index 6bf4289c46..4ea1770b7d 100644 --- a/configs/advanced/write_node.yaml +++ b/configs/advanced/write_node.yaml @@ -20,6 +20,10 @@ writeNode: maxParallelism: 1024 msgStream: + dataDefinition: + recvBufSize: 64 # msgPack chan buffer size + pulsarBufSize: 64 # pulsar chan buffer size + insert: #streamBufSize: 1024 # msgPack chan buffer size recvBufSize: 1024 # msgPack chan buffer size @@ -29,13 +33,3 @@ writeNode: #streamBufSize: 1024 # msgPack chan buffer size recvBufSize: 1024 # msgPack chan buffer size pulsarBufSize: 1024 # pulsar chan buffer size - - search: - recvBufSize: 512 - pulsarBufSize: 512 - - searchResult: - recvBufSize: 64 - - stats: - recvBufSize: 64 diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index d9ec9a96a8..f4a5e11369 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -17,11 +17,6 @@ type ddNode struct { replica collectionReplica } -type metaOperateRecord struct { - createOrDrop bool // create: true, drop: false - timestamp Timestamp -} - func (ddNode *ddNode) Name() string { return "ddNode" } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index a12c271656..08288285f8 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -9,8 +9,8 @@ import ( ) type filterDmNode struct { - ddMsg *ddMsg BaseNode + ddMsg *ddMsg } func (fdmNode *filterDmNode) Name() string { diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 28468746bd..88a133fab3 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -19,6 +19,11 @@ type ddMsg struct { timeRange TimeRange } +type metaOperateRecord struct { + createOrDrop bool // create: true, drop: false + timestamp Timestamp +} + type insertMsg struct { insertMessages []*msgstream.InsertMsg timeRange TimeRange diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index f738cd8452..49ee58138a 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -118,7 +118,7 @@ func makeNewChannelNames(names []string, suffix string) []string { } func refreshChannelNames() { - suffix := "_test_query_node" + strconv.FormatInt(rand.Int63n(100), 10) + suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(100), 10) Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix) diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 6642eed1ff..4503449189 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -291,9 +291,9 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { inReduced := make([]bool, len(searchResults)) numSegment := int64(len(searchResults)) - err = reduceSearchResults(searchResults, numSegment, inReduced) - if err != nil { - return err + err2 := reduceSearchResults(searchResults, numSegment, inReduced) + if err2 != nil { + return err2 } err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced) if err != nil { diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go index 3bdfa884ec..f51943c9eb 100644 --- a/internal/storage/binlog_reader.go +++ b/internal/storage/binlog_reader.go @@ -11,15 +11,24 @@ import ( type BinlogReader struct { magicNumber int32 descriptorEvent - buffer *bytes.Buffer - eventList []*EventReader - isClose bool + currentEventReader *EventReader + buffer *bytes.Buffer + bufferLength int + currentOffset int32 + isClose bool } func (reader *BinlogReader) NextEventReader() (*EventReader, error) { if reader.isClose { return nil, errors.New("bin log reader is closed") } + if reader.currentEventReader != nil { + reader.currentOffset = reader.currentEventReader.NextPosition + if err := reader.currentEventReader.Close(); err != nil { + return nil, err + } + reader.currentEventReader = nil + } if reader.buffer.Len() <= 0 { return nil, nil } @@ -27,14 +36,15 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) { if err != nil { return nil, err } - reader.eventList = append(reader.eventList, eventReader) - return eventReader, nil + reader.currentEventReader = eventReader + return reader.currentEventReader, nil } func (reader *BinlogReader) readMagicNumber() (int32, error) { if err := binary.Read(reader.buffer, binary.LittleEndian, &reader.magicNumber); err != nil { return -1, err } + reader.currentOffset = 4 if reader.magicNumber != MagicNumber { return -1, errors.New("parse magic number failed, expected: " + strconv.Itoa(int(MagicNumber)) + ", actual: " + strconv.Itoa(int(reader.magicNumber))) @@ -45,6 +55,7 @@ func (reader *BinlogReader) readMagicNumber() (int32, error) { func (reader *BinlogReader) readDescriptorEvent() (*descriptorEvent, error) { event, err := ReadDescriptorEvent(reader.buffer) + reader.currentOffset = event.NextPosition if err != nil { return nil, err } @@ -56,20 +67,20 @@ func (reader *BinlogReader) Close() error { if reader.isClose { return nil } - for _, e := range reader.eventList { - if err := e.Close(); err != nil { + reader.isClose = true + if reader.currentEventReader != nil { + if err := reader.currentEventReader.Close(); err != nil { return err } } - reader.isClose = true return nil } func NewBinlogReader(data []byte) (*BinlogReader, error) { reader := &BinlogReader{ - buffer: bytes.NewBuffer(data), - eventList: []*EventReader{}, - isClose: false, + buffer: bytes.NewBuffer(data), + bufferLength: len(data), + isClose: false, } if _, err := reader.readMagicNumber(); err != nil { diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index 0ca7dd47f7..75ff432ab2 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -241,812 +241,4 @@ func TestInsertBinlog(t *testing.T) { assert.Equal(t, int(e2NxtPos), len(buf)) - //read binlog - r, err := NewBinlogReader(buf) - assert.Nil(t, err) - event1, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event1) - p1, err := event1.GetInt64FromPayload() - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.Nil(t, err) - assert.Equal(t, event1.TypeCode, InsertEventType) - ed1, ok := (event1.eventData).(*insertEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event2) - p2, err := event2.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, InsertEventType) - ed2, ok := (event2.eventData).(*insertEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*deleteEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) -} - -func TestDeleteBinlog(t *testing.T) { - w, err := NewDeleteBinlogWriter(schemapb.DataType_INT64, 50) - assert.Nil(t, err) - - e1, err := w.NextDeleteEventWriter() - assert.Nil(t, err) - err = e1.AddDataToPayload([]int64{1, 2, 3}) - assert.Nil(t, err) - err = e1.AddDataToPayload([]int32{4, 5, 6}) - assert.NotNil(t, err) - err = e1.AddDataToPayload([]int64{4, 5, 6}) - assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) - - e2, err := w.NextDeleteEventWriter() - assert.Nil(t, err) - err = e2.AddDataToPayload([]int64{7, 8, 9}) - assert.Nil(t, err) - err = e2.AddDataToPayload([]bool{true, false, true}) - assert.NotNil(t, err) - err = e2.AddDataToPayload([]int64{10, 11, 12}) - assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) - - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) - - _, err = w.GetBuffer() - assert.NotNil(t, err) - err = w.Close() - assert.Nil(t, err) - buf, err := w.GetBuffer() - assert.Nil(t, err) - - //magic number - magicNum := UnsafeReadInt32(buf, 0) - assert.Equal(t, magicNum, MagicNumber) - pos := int(unsafe.Sizeof(MagicNumber)) - - //descriptor header, timestamp - ts := UnsafeReadInt64(buf, pos) - assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - diffts := curts - ts - maxdiff := int64(tsoutil.ComposeTS(1000, 0)) - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(ts)) - - //descriptor header, type code - tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(tc), DescriptorEventType) - pos += int(unsafe.Sizeof(tc)) - - //descriptor header, server id - svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(svrID)) - - //descriptor header, event length - descEventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(descEventLen)) - - //descriptor header, next position - descNxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //descriptor data fix, binlog version - binLogVer := UnsafeReadInt16(buf, pos) - assert.Equal(t, binLogVer, int16(BinlogVersion)) - pos += int(unsafe.Sizeof(binLogVer)) - - //descriptor data fix, server version - svrVer := UnsafeReadInt64(buf, pos) - assert.Equal(t, svrVer, int64(ServerVersion)) - pos += int(unsafe.Sizeof(svrVer)) - - //descriptor data fix, commit id - cmitID := UnsafeReadInt64(buf, pos) - assert.Equal(t, cmitID, int64(CommitID)) - pos += int(unsafe.Sizeof(cmitID)) - - //descriptor data fix, header length - headLen := UnsafeReadInt8(buf, pos) - assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) - pos += int(unsafe.Sizeof(headLen)) - - //descriptor data fix, collection id - collID := UnsafeReadInt64(buf, pos) - assert.Equal(t, collID, int64(50)) - pos += int(unsafe.Sizeof(collID)) - - //descriptor data fix, partition id - partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) - pos += int(unsafe.Sizeof(partID)) - - //descriptor data fix, segment id - segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) - pos += int(unsafe.Sizeof(segID)) - - //descriptor data fix, field id - fieldID := UnsafeReadInt64(buf, pos) - assert.Equal(t, fieldID, int64(-1)) - pos += int(unsafe.Sizeof(fieldID)) - - //descriptor data fix, start time stamp - startts := UnsafeReadInt64(buf, pos) - assert.Equal(t, startts, int64(1000)) - pos += int(unsafe.Sizeof(startts)) - - //descriptor data fix, end time stamp - endts := UnsafeReadInt64(buf, pos) - assert.Equal(t, endts, int64(2000)) - pos += int(unsafe.Sizeof(endts)) - - //descriptor data fix, payload type - colType := UnsafeReadInt32(buf, pos) - assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) - pos += int(unsafe.Sizeof(colType)) - - //descriptor data, post header lengths - for i := DescriptorEventType; i < EventTypeEnd; i++ { - size := getEventFixPartSize(i) - assert.Equal(t, uint8(size), buf[pos]) - pos++ - } - - //start of e1 - assert.Equal(t, pos, int(descNxtPos)) - - //insert e1 header, Timestamp - e1ts := UnsafeReadInt64(buf, pos) - diffts = curts - e1ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e1ts)) - - //insert e1 header, type code - e1tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e1tc), DeleteEventType) - pos += int(unsafe.Sizeof(e1tc)) - - //insert e1 header, Server id - e1svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e1svrID)) - - //insert e1 header, event length - e1EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e1EventLen)) - - //insert e1 header, next position - e1NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e1 data, start time stamp - e1st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1st, int64(100)) - pos += int(unsafe.Sizeof(e1st)) - - //insert e1 data, end time stamp - e1et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1et, int64(200)) - pos += int(unsafe.Sizeof(e1et)) - - //insert e1, payload - e1Payload := buf[pos:e1NxtPos] - e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) - assert.Nil(t, err) - e1a, err := e1r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) - err = e1r.Close() - assert.Nil(t, err) - - //start of e2 - pos = int(e1NxtPos) - - //insert e2 header, Timestamp - e2ts := UnsafeReadInt64(buf, pos) - diffts = curts - e2ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e2ts)) - - //insert e2 header, type code - e2tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e2tc), DeleteEventType) - pos += int(unsafe.Sizeof(e2tc)) - - //insert e2 header, Server id - e2svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e2svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e2svrID)) - - //insert e2 header, event length - e2EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e2EventLen)) - - //insert e2 header, next position - e2NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e2 data, start time stamp - e2st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2st, int64(300)) - pos += int(unsafe.Sizeof(e2st)) - - //insert e2 data, end time stamp - e2et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2et, int64(400)) - pos += int(unsafe.Sizeof(e2et)) - - //insert e2, payload - e2Payload := buf[pos:] - e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) - assert.Nil(t, err) - e2a, err := e2r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) - err = e2r.Close() - assert.Nil(t, err) - - assert.Equal(t, int(e2NxtPos), len(buf)) - - //read binlog - r, err := NewBinlogReader(buf) - assert.Nil(t, err) - event1, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event1) - p1, err := event1.GetInt64FromPayload() - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.Nil(t, err) - assert.Equal(t, event1.TypeCode, DeleteEventType) - ed1, ok := (event1.eventData).(*deleteEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event2) - p2, err := event2.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, DeleteEventType) - ed2, ok := (event2.eventData).(*deleteEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*insertEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) -} - -func TestDDLBinlog1(t *testing.T) { - w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50) - assert.Nil(t, err) - - e1, err := w.NextCreateCollectionEventWriter() - assert.Nil(t, err) - err = e1.AddDataToPayload([]int64{1, 2, 3}) - assert.Nil(t, err) - err = e1.AddDataToPayload([]int32{4, 5, 6}) - assert.NotNil(t, err) - err = e1.AddDataToPayload([]int64{4, 5, 6}) - assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) - - e2, err := w.NextDropCollectionEventWriter() - assert.Nil(t, err) - err = e2.AddDataToPayload([]int64{7, 8, 9}) - assert.Nil(t, err) - err = e2.AddDataToPayload([]bool{true, false, true}) - assert.NotNil(t, err) - err = e2.AddDataToPayload([]int64{10, 11, 12}) - assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) - - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) - - _, err = w.GetBuffer() - assert.NotNil(t, err) - err = w.Close() - assert.Nil(t, err) - buf, err := w.GetBuffer() - assert.Nil(t, err) - - //magic number - magicNum := UnsafeReadInt32(buf, 0) - assert.Equal(t, magicNum, MagicNumber) - pos := int(unsafe.Sizeof(MagicNumber)) - - //descriptor header, timestamp - ts := UnsafeReadInt64(buf, pos) - assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - diffts := curts - ts - maxdiff := int64(tsoutil.ComposeTS(1000, 0)) - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(ts)) - - //descriptor header, type code - tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(tc), DescriptorEventType) - pos += int(unsafe.Sizeof(tc)) - - //descriptor header, server id - svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(svrID)) - - //descriptor header, event length - descEventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(descEventLen)) - - //descriptor header, next position - descNxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //descriptor data fix, binlog version - binLogVer := UnsafeReadInt16(buf, pos) - assert.Equal(t, binLogVer, int16(BinlogVersion)) - pos += int(unsafe.Sizeof(binLogVer)) - - //descriptor data fix, server version - svrVer := UnsafeReadInt64(buf, pos) - assert.Equal(t, svrVer, int64(ServerVersion)) - pos += int(unsafe.Sizeof(svrVer)) - - //descriptor data fix, commit id - cmitID := UnsafeReadInt64(buf, pos) - assert.Equal(t, cmitID, int64(CommitID)) - pos += int(unsafe.Sizeof(cmitID)) - - //descriptor data fix, header length - headLen := UnsafeReadInt8(buf, pos) - assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) - pos += int(unsafe.Sizeof(headLen)) - - //descriptor data fix, collection id - collID := UnsafeReadInt64(buf, pos) - assert.Equal(t, collID, int64(50)) - pos += int(unsafe.Sizeof(collID)) - - //descriptor data fix, partition id - partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) - pos += int(unsafe.Sizeof(partID)) - - //descriptor data fix, segment id - segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) - pos += int(unsafe.Sizeof(segID)) - - //descriptor data fix, field id - fieldID := UnsafeReadInt64(buf, pos) - assert.Equal(t, fieldID, int64(-1)) - pos += int(unsafe.Sizeof(fieldID)) - - //descriptor data fix, start time stamp - startts := UnsafeReadInt64(buf, pos) - assert.Equal(t, startts, int64(1000)) - pos += int(unsafe.Sizeof(startts)) - - //descriptor data fix, end time stamp - endts := UnsafeReadInt64(buf, pos) - assert.Equal(t, endts, int64(2000)) - pos += int(unsafe.Sizeof(endts)) - - //descriptor data fix, payload type - colType := UnsafeReadInt32(buf, pos) - assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) - pos += int(unsafe.Sizeof(colType)) - - //descriptor data, post header lengths - for i := DescriptorEventType; i < EventTypeEnd; i++ { - size := getEventFixPartSize(i) - assert.Equal(t, uint8(size), buf[pos]) - pos++ - } - - //start of e1 - assert.Equal(t, pos, int(descNxtPos)) - - //insert e1 header, Timestamp - e1ts := UnsafeReadInt64(buf, pos) - diffts = curts - e1ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e1ts)) - - //insert e1 header, type code - e1tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e1tc), CreateCollectionEventType) - pos += int(unsafe.Sizeof(e1tc)) - - //insert e1 header, Server id - e1svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e1svrID)) - - //insert e1 header, event length - e1EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e1EventLen)) - - //insert e1 header, next position - e1NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e1 data, start time stamp - e1st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1st, int64(100)) - pos += int(unsafe.Sizeof(e1st)) - - //insert e1 data, end time stamp - e1et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1et, int64(200)) - pos += int(unsafe.Sizeof(e1et)) - - //insert e1, payload - e1Payload := buf[pos:e1NxtPos] - e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) - assert.Nil(t, err) - e1a, err := e1r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) - err = e1r.Close() - assert.Nil(t, err) - - //start of e2 - pos = int(e1NxtPos) - - //insert e2 header, Timestamp - e2ts := UnsafeReadInt64(buf, pos) - diffts = curts - e2ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e2ts)) - - //insert e2 header, type code - e2tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e2tc), DropCollectionEventType) - pos += int(unsafe.Sizeof(e2tc)) - - //insert e2 header, Server id - e2svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e2svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e2svrID)) - - //insert e2 header, event length - e2EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e2EventLen)) - - //insert e2 header, next position - e2NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e2 data, start time stamp - e2st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2st, int64(300)) - pos += int(unsafe.Sizeof(e2st)) - - //insert e2 data, end time stamp - e2et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2et, int64(400)) - pos += int(unsafe.Sizeof(e2et)) - - //insert e2, payload - e2Payload := buf[pos:] - e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) - assert.Nil(t, err) - e2a, err := e2r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) - err = e2r.Close() - assert.Nil(t, err) - - assert.Equal(t, int(e2NxtPos), len(buf)) - - //read binlog - r, err := NewBinlogReader(buf) - assert.Nil(t, err) - event1, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event1) - p1, err := event1.GetInt64FromPayload() - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.Nil(t, err) - assert.Equal(t, event1.TypeCode, CreateCollectionEventType) - ed1, ok := (event1.eventData).(*createCollectionEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event2) - p2, err := event2.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, DropCollectionEventType) - ed2, ok := (event2.eventData).(*dropCollectionEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*insertEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) -} - -func TestDDLBinlog2(t *testing.T) { - w, err := NewDDLBinlogWriter(schemapb.DataType_INT64, 50) - assert.Nil(t, err) - - e1, err := w.NextCreatePartitionEventWriter() - assert.Nil(t, err) - err = e1.AddDataToPayload([]int64{1, 2, 3}) - assert.Nil(t, err) - err = e1.AddDataToPayload([]int32{4, 5, 6}) - assert.NotNil(t, err) - err = e1.AddDataToPayload([]int64{4, 5, 6}) - assert.Nil(t, err) - e1.SetStartTimestamp(100) - e1.SetEndTimestamp(200) - - e2, err := w.NextDropPartitionEventWriter() - assert.Nil(t, err) - err = e2.AddDataToPayload([]int64{7, 8, 9}) - assert.Nil(t, err) - err = e2.AddDataToPayload([]bool{true, false, true}) - assert.NotNil(t, err) - err = e2.AddDataToPayload([]int64{10, 11, 12}) - assert.Nil(t, err) - e2.SetStartTimestamp(300) - e2.SetEndTimestamp(400) - - w.SetStartTimeStamp(1000) - w.SetEndTimeStamp(2000) - - _, err = w.GetBuffer() - assert.NotNil(t, err) - err = w.Close() - assert.Nil(t, err) - buf, err := w.GetBuffer() - assert.Nil(t, err) - - //magic number - magicNum := UnsafeReadInt32(buf, 0) - assert.Equal(t, magicNum, MagicNumber) - pos := int(unsafe.Sizeof(MagicNumber)) - - //descriptor header, timestamp - ts := UnsafeReadInt64(buf, pos) - assert.Greater(t, ts, int64(0)) - curts := time.Now().UnixNano() / int64(time.Millisecond) - curts = int64(tsoutil.ComposeTS(curts, 0)) - diffts := curts - ts - maxdiff := int64(tsoutil.ComposeTS(1000, 0)) - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(ts)) - - //descriptor header, type code - tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(tc), DescriptorEventType) - pos += int(unsafe.Sizeof(tc)) - - //descriptor header, server id - svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(svrID)) - - //descriptor header, event length - descEventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(descEventLen)) - - //descriptor header, next position - descNxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descEventLen+int32(unsafe.Sizeof(MagicNumber)), descNxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //descriptor data fix, binlog version - binLogVer := UnsafeReadInt16(buf, pos) - assert.Equal(t, binLogVer, int16(BinlogVersion)) - pos += int(unsafe.Sizeof(binLogVer)) - - //descriptor data fix, server version - svrVer := UnsafeReadInt64(buf, pos) - assert.Equal(t, svrVer, int64(ServerVersion)) - pos += int(unsafe.Sizeof(svrVer)) - - //descriptor data fix, commit id - cmitID := UnsafeReadInt64(buf, pos) - assert.Equal(t, cmitID, int64(CommitID)) - pos += int(unsafe.Sizeof(cmitID)) - - //descriptor data fix, header length - headLen := UnsafeReadInt8(buf, pos) - assert.Equal(t, headLen, int8(binary.Size(eventHeader{}))) - pos += int(unsafe.Sizeof(headLen)) - - //descriptor data fix, collection id - collID := UnsafeReadInt64(buf, pos) - assert.Equal(t, collID, int64(50)) - pos += int(unsafe.Sizeof(collID)) - - //descriptor data fix, partition id - partID := UnsafeReadInt64(buf, pos) - assert.Equal(t, partID, int64(-1)) - pos += int(unsafe.Sizeof(partID)) - - //descriptor data fix, segment id - segID := UnsafeReadInt64(buf, pos) - assert.Equal(t, segID, int64(-1)) - pos += int(unsafe.Sizeof(segID)) - - //descriptor data fix, field id - fieldID := UnsafeReadInt64(buf, pos) - assert.Equal(t, fieldID, int64(-1)) - pos += int(unsafe.Sizeof(fieldID)) - - //descriptor data fix, start time stamp - startts := UnsafeReadInt64(buf, pos) - assert.Equal(t, startts, int64(1000)) - pos += int(unsafe.Sizeof(startts)) - - //descriptor data fix, end time stamp - endts := UnsafeReadInt64(buf, pos) - assert.Equal(t, endts, int64(2000)) - pos += int(unsafe.Sizeof(endts)) - - //descriptor data fix, payload type - colType := UnsafeReadInt32(buf, pos) - assert.Equal(t, schemapb.DataType(colType), schemapb.DataType_INT64) - pos += int(unsafe.Sizeof(colType)) - - //descriptor data, post header lengths - for i := DescriptorEventType; i < EventTypeEnd; i++ { - size := getEventFixPartSize(i) - assert.Equal(t, uint8(size), buf[pos]) - pos++ - } - - //start of e1 - assert.Equal(t, pos, int(descNxtPos)) - - //insert e1 header, Timestamp - e1ts := UnsafeReadInt64(buf, pos) - diffts = curts - e1ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e1ts)) - - //insert e1 header, type code - e1tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e1tc), CreatePartitionEventType) - pos += int(unsafe.Sizeof(e1tc)) - - //insert e1 header, Server id - e1svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e1svrID)) - - //insert e1 header, event length - e1EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e1EventLen)) - - //insert e1 header, next position - e1NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, descNxtPos+e1EventLen, e1NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e1 data, start time stamp - e1st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1st, int64(100)) - pos += int(unsafe.Sizeof(e1st)) - - //insert e1 data, end time stamp - e1et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e1et, int64(200)) - pos += int(unsafe.Sizeof(e1et)) - - //insert e1, payload - e1Payload := buf[pos:e1NxtPos] - e1r, err := NewPayloadReader(schemapb.DataType_INT64, e1Payload) - assert.Nil(t, err) - e1a, err := e1r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e1a, []int64{1, 2, 3, 4, 5, 6}) - err = e1r.Close() - assert.Nil(t, err) - - //start of e2 - pos = int(e1NxtPos) - - //insert e2 header, Timestamp - e2ts := UnsafeReadInt64(buf, pos) - diffts = curts - e2ts - assert.LessOrEqual(t, diffts, maxdiff) - pos += int(unsafe.Sizeof(e2ts)) - - //insert e2 header, type code - e2tc := UnsafeReadInt8(buf, pos) - assert.Equal(t, EventTypeCode(e2tc), DropPartitionEventType) - pos += int(unsafe.Sizeof(e2tc)) - - //insert e2 header, Server id - e2svrID := UnsafeReadInt32(buf, pos) - assert.Equal(t, e2svrID, int32(ServerID)) - pos += int(unsafe.Sizeof(e2svrID)) - - //insert e2 header, event length - e2EventLen := UnsafeReadInt32(buf, pos) - pos += int(unsafe.Sizeof(e2EventLen)) - - //insert e2 header, next position - e2NxtPos := UnsafeReadInt32(buf, pos) - assert.Equal(t, e1NxtPos+e2EventLen, e2NxtPos) - pos += int(unsafe.Sizeof(descNxtPos)) - - //insert e2 data, start time stamp - e2st := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2st, int64(300)) - pos += int(unsafe.Sizeof(e2st)) - - //insert e2 data, end time stamp - e2et := UnsafeReadInt64(buf, pos) - assert.Equal(t, e2et, int64(400)) - pos += int(unsafe.Sizeof(e2et)) - - //insert e2, payload - e2Payload := buf[pos:] - e2r, err := NewPayloadReader(schemapb.DataType_INT64, e2Payload) - assert.Nil(t, err) - e2a, err := e2r.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, e2a, []int64{7, 8, 9, 10, 11, 12}) - err = e2r.Close() - assert.Nil(t, err) - - assert.Equal(t, int(e2NxtPos), len(buf)) - - //read binlog - r, err := NewBinlogReader(buf) - assert.Nil(t, err) - event1, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event1) - p1, err := event1.GetInt64FromPayload() - assert.Equal(t, p1, []int64{1, 2, 3, 4, 5, 6}) - assert.Nil(t, err) - assert.Equal(t, event1.TypeCode, CreatePartitionEventType) - ed1, ok := (event1.eventData).(*createPartitionEventData) - assert.True(t, ok) - assert.Equal(t, ed1.StartTimestamp, Timestamp(100)) - assert.Equal(t, ed1.EndTimestamp, Timestamp(200)) - - event2, err := r.NextEventReader() - assert.Nil(t, err) - assert.NotNil(t, event2) - p2, err := event2.GetInt64FromPayload() - assert.Nil(t, err) - assert.Equal(t, p2, []int64{7, 8, 9, 10, 11, 12}) - assert.Equal(t, event2.TypeCode, DropPartitionEventType) - ed2, ok := (event2.eventData).(*dropPartitionEventData) - assert.True(t, ok) - _, ok = (event2.eventData).(*insertEventData) - assert.False(t, ok) - assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) - assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) } diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 3cd12c2426..a84faed97d 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -223,13 +223,12 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID }, }, nil } -func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DeleteBinlogWriter, error) { +func NewDeleteBinlogWriter(dataType schemapb.DataType) (*DeleteBinlogWriter, error) { descriptorEvent, err := newDescriptorEvent() if err != nil { return nil, err } descriptorEvent.PayloadDataType = dataType - descriptorEvent.CollectionID = collectionID return &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, @@ -240,13 +239,12 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*Del }, }, nil } -func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBinlogWriter, error) { +func NewDDLBinlogWriter(dataType schemapb.DataType) (*DDLBinlogWriter, error) { descriptorEvent, err := newDescriptorEvent() if err != nil { return nil, err } descriptorEvent.PayloadDataType = dataType - descriptorEvent.CollectionID = collectionID return &DDLBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ descriptorEvent: *descriptorEvent, diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index f88bd6bc33..fd313b0df7 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -54,5 +54,6 @@ func TestBinlogWriterReader(t *testing.T) { reader, err := binlogReader.NextEventReader() assert.Nil(t, err) + fmt.Println("reader offset : " + strconv.Itoa(int(binlogReader.currentOffset))) assert.Nil(t, reader) } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 0537646d12..942f48b5a1 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -354,7 +354,7 @@ type DataDefinitionCodec struct { } func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID) + writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING) if err != nil { return nil, err } @@ -426,7 +426,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ value: buffer, }) - writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID) + writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64) if err != nil { return nil, err } diff --git a/internal/writenode/data_sync_service.go b/internal/writenode/data_sync_service.go index a58c3afd33..ad156d46ec 100644 --- a/internal/writenode/data_sync_service.go +++ b/internal/writenode/data_sync_service.go @@ -37,15 +37,21 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) var dmStreamNode Node = newDmInputNode(dsService.ctx) + var ddStreamNode Node = newDDInputNode(dsService.ctx) + + var ddNode Node = newDDNode() var filterDmNode Node = newFilteredDmNode() var insertBufferNode Node = newInsertBufferNode() - var serviceTimeNode Node = newServiceTimeNode() dsService.fg.AddNode(&dmStreamNode) - dsService.fg.AddNode(&filterDmNode) - dsService.fg.AddNode(&insertBufferNode) - dsService.fg.AddNode(&serviceTimeNode) + dsService.fg.AddNode(&ddStreamNode) + dsService.fg.AddNode(&filterDmNode) + dsService.fg.AddNode(&ddNode) + + dsService.fg.AddNode(&insertBufferNode) + + // dmStreamNode var err = dsService.fg.SetEdges(dmStreamNode.Name(), []string{}, []string{filterDmNode.Name()}, @@ -54,27 +60,39 @@ func (dsService *dataSyncService) initNodes() { log.Fatal("set edges failed in node:", dmStreamNode.Name()) } + // ddStreamNode + err = dsService.fg.SetEdges(ddStreamNode.Name(), + []string{}, + []string{ddNode.Name()}, + ) + if err != nil { + log.Fatal("set edges failed in node:", ddStreamNode.Name()) + } + + // filterDmNode err = dsService.fg.SetEdges(filterDmNode.Name(), - []string{dmStreamNode.Name()}, + []string{dmStreamNode.Name(), ddNode.Name()}, []string{insertBufferNode.Name()}, ) if err != nil { log.Fatal("set edges failed in node:", filterDmNode.Name()) } + // ddNode + err = dsService.fg.SetEdges(ddNode.Name(), + []string{ddStreamNode.Name()}, + []string{filterDmNode.Name()}, + ) + if err != nil { + log.Fatal("set edges failed in node:", ddNode.Name()) + } + + // insertBufferNode err = dsService.fg.SetEdges(insertBufferNode.Name(), []string{filterDmNode.Name()}, - []string{serviceTimeNode.Name()}, + []string{}, ) if err != nil { log.Fatal("set edges failed in node:", insertBufferNode.Name()) } - - err = dsService.fg.SetEdges(serviceTimeNode.Name(), - []string{insertBufferNode.Name()}, - []string{}, - ) - if err != nil { - log.Fatal("set edges failed in node:", serviceTimeNode.Name()) - } } diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go index 646fb64ad3..3797468aca 100644 --- a/internal/writenode/data_sync_service_test.go +++ b/internal/writenode/data_sync_service_test.go @@ -16,7 +16,6 @@ import ( // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { - Params.Init() const ctxTimeInMillisecond = 200 const closeWithDeadline = true var ctx context.Context @@ -31,7 +30,7 @@ func TestDataSyncService_Start(t *testing.T) { } // init write node - pulsarURL, _ := Params.pulsarAddress() + pulsarURL := Params.PulsarAddress node := NewWriteNode(ctx, 0) // test data generate @@ -116,20 +115,30 @@ func TestDataSyncService_Start(t *testing.T) { // pulsar produce const receiveBufSize = 1024 - producerChannels := Params.insertChannelNames() + insertChannels := Params.InsertChannelNames + ddChannels := Params.DDChannelNames insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) insertStream.SetPulsarClient(pulsarURL) - insertStream.CreatePulsarProducers(producerChannels) + insertStream.CreatePulsarProducers(insertChannels) + + ddStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) + ddStream.SetPulsarClient(pulsarURL) + ddStream.CreatePulsarProducers(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream insertMsgStream.Start() + var ddMsgStream msgstream.MsgStream = ddStream + ddMsgStream.Start() + err := insertMsgStream.Produce(&msgPack) assert.NoError(t, err) err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) + err = ddMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) // dataSync node.dataSyncService = newDataSyncService(node.ctx) diff --git a/internal/writenode/dd_buffer.go b/internal/writenode/dd_buffer.go new file mode 100644 index 0000000000..b312f856ef --- /dev/null +++ b/internal/writenode/dd_buffer.go @@ -0,0 +1,47 @@ +package writenode + +import ( + "errors" + "strconv" +) + +type ddBuffer struct { + collectionBuffer map[UniqueID]interface{} + partitionBuffer map[UniqueID]interface{} +} + +func (d *ddBuffer) addCollection(collectionID UniqueID) error { + if _, ok := d.collectionBuffer[collectionID]; !ok { + return errors.New("collection " + strconv.FormatInt(collectionID, 10) + " is already exists") + } + + d.collectionBuffer[collectionID] = nil + return nil +} + +func (d *ddBuffer) removeCollection(collectionID UniqueID) error { + if _, ok := d.collectionBuffer[collectionID]; !ok { + return errors.New("cannot found collection " + strconv.FormatInt(collectionID, 10)) + } + + delete(d.collectionBuffer, collectionID) + return nil +} + +func (d *ddBuffer) addPartition(partitionID UniqueID) error { + if _, ok := d.partitionBuffer[partitionID]; !ok { + return errors.New("partition " + strconv.FormatInt(partitionID, 10) + " is already exists") + } + + d.partitionBuffer[partitionID] = nil + return nil +} + +func (d *ddBuffer) removePartition(partitionID UniqueID) error { + if _, ok := d.partitionBuffer[partitionID]; !ok { + return errors.New("cannot found partition " + strconv.FormatInt(partitionID, 10)) + } + + delete(d.partitionBuffer, partitionID) + return nil +} diff --git a/internal/writenode/flow_graph_dd_node.go b/internal/writenode/flow_graph_dd_node.go new file mode 100644 index 0000000000..4b319e53cb --- /dev/null +++ b/internal/writenode/flow_graph_dd_node.go @@ -0,0 +1,176 @@ +package writenode + +import ( + "log" + "sort" + + "github.com/golang/protobuf/proto" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +type ddNode struct { + BaseNode + ddMsg *ddMsg + ddBuffer *ddBuffer +} + +func (ddNode *ddNode) Name() string { + return "ddNode" +} + +func (ddNode *ddNode) Operate(in []*Msg) []*Msg { + //fmt.Println("Do filterDmNode operation") + + if len(in) != 1 { + log.Println("Invalid operate message input in ddNode, input length = ", len(in)) + // TODO: add error handling + } + + msMsg, ok := (*in[0]).(*MsgStreamMsg) + if !ok { + log.Println("type assertion failed for MsgStreamMsg") + // TODO: add error handling + } + + var ddMsg = ddMsg{ + collectionRecords: make(map[string][]metaOperateRecord), + partitionRecords: make(map[string][]metaOperateRecord), + timeRange: TimeRange{ + timestampMin: msMsg.TimestampMin(), + timestampMax: msMsg.TimestampMax(), + }, + } + ddNode.ddMsg = &ddMsg + + // sort tsMessages + tsMessages := msMsg.TsMessages() + sort.Slice(tsMessages, + func(i, j int) bool { + return tsMessages[i].BeginTs() < tsMessages[j].BeginTs() + }) + + // do dd tasks + for _, msg := range tsMessages { + switch msg.Type() { + case internalPb.MsgType_kCreateCollection: + ddNode.createCollection(msg.(*msgstream.CreateCollectionMsg)) + case internalPb.MsgType_kDropCollection: + ddNode.dropCollection(msg.(*msgstream.DropCollectionMsg)) + case internalPb.MsgType_kCreatePartition: + ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg)) + case internalPb.MsgType_kDropPartition: + ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) + default: + log.Println("Non supporting message type:", msg.Type()) + } + } + + var res Msg = ddNode.ddMsg + return []*Msg{&res} +} + +func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { + collectionID := msg.CollectionID + + err := ddNode.ddBuffer.addCollection(collectionID) + if err != nil { + log.Println(err) + return + } + + // TODO: add default partition? + + var schema schemapb.CollectionSchema + err = proto.Unmarshal((*msg.Schema).Value, &schema) + if err != nil { + log.Println(err) + return + } + collectionName := schema.Name + ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], + metaOperateRecord{ + createOrDrop: true, + timestamp: msg.Timestamp, + }) + + // TODO: write dd binlog +} + +func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { + collectionID := msg.CollectionID + + err := ddNode.ddBuffer.removeCollection(collectionID) + if err != nil { + log.Println(err) + return + } + + collectionName := msg.CollectionName.CollectionName + ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], + metaOperateRecord{ + createOrDrop: false, + timestamp: msg.Timestamp, + }) + + // TODO: write dd binlog +} + +func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { + partitionID := msg.PartitionID + + err := ddNode.ddBuffer.addPartition(partitionID) + if err != nil { + log.Println(err) + return + } + + partitionTag := msg.PartitionName.Tag + ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], + metaOperateRecord{ + createOrDrop: true, + timestamp: msg.Timestamp, + }) + + // TODO: write dd binlog +} + +func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { + partitionID := msg.PartitionID + + err := ddNode.ddBuffer.removePartition(partitionID) + if err != nil { + log.Println(err) + return + } + + partitionTag := msg.PartitionName.Tag + ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], + metaOperateRecord{ + createOrDrop: false, + timestamp: msg.Timestamp, + }) + + // TODO: write dd binlog +} + +func newDDNode() *ddNode { + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := BaseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + ddBuffer := &ddBuffer{ + collectionBuffer: make(map[UniqueID]interface{}), + partitionBuffer: make(map[UniqueID]interface{}), + } + + return &ddNode{ + BaseNode: baseNode, + ddBuffer: ddBuffer, + } +} diff --git a/internal/writenode/flow_graph_filter_dm_node.go b/internal/writenode/flow_graph_filter_dm_node.go index b01ecfcb56..8180627268 100644 --- a/internal/writenode/flow_graph_filter_dm_node.go +++ b/internal/writenode/flow_graph_filter_dm_node.go @@ -4,11 +4,13 @@ import ( "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) type filterDmNode struct { BaseNode + ddMsg *ddMsg } func (fdmNode *filterDmNode) Name() string { @@ -16,29 +18,40 @@ func (fdmNode *filterDmNode) Name() string { } func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { + //fmt.Println("Do filterDmNode operation") - if len(in) != 1 { + if len(in) != 2 { log.Println("Invalid operate message input in filterDmNode, input length = ", len(in)) // TODO: add error handling } - msMsg, ok := (*in[0]).(*MsgStreamMsg) + msgStreamMsg, ok := (*in[0]).(*MsgStreamMsg) if !ok { log.Println("type assertion failed for MsgStreamMsg") // TODO: add error handling } + ddMsg, ok := (*in[1]).(*ddMsg) + if !ok { + log.Println("type assertion failed for ddMsg") + // TODO: add error handling + } + fdmNode.ddMsg = ddMsg + var iMsg = insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), timeRange: TimeRange{ - timestampMin: msMsg.TimestampMin(), - timestampMax: msMsg.TimestampMax(), + timestampMin: msgStreamMsg.TimestampMin(), + timestampMax: msgStreamMsg.TimestampMax(), }, } - for _, msg := range msMsg.TsMessages() { + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case internalPb.MsgType_kInsert: - iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) + resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) + if resMsg != nil { + iMsg.insertMessages = append(iMsg.insertMessages, resMsg) + } case internalPb.MsgType_kFlush: iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) // case internalPb.MsgType_kDelete: @@ -52,9 +65,44 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { return []*Msg{&res} } +func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) *msgstream.InsertMsg { + // No dd record, do all insert requests. + records, ok := fdmNode.ddMsg.collectionRecords[msg.CollectionName] + if !ok { + return msg + } + + // If the last record is drop type, all insert requests are invalid. + if !records[len(records)-1].createOrDrop { + return nil + } + + // Filter insert requests before last record. + if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { + // TODO: what if the messages are misaligned? Here, we ignore those messages and print error + log.Println("Error, misaligned messages detected") + return nil + } + tmpTimestamps := make([]Timestamp, 0) + tmpRowIDs := make([]int64, 0) + tmpRowData := make([]*commonpb.Blob, 0) + targetTimestamp := records[len(records)-1].timestamp + for i, t := range msg.Timestamps { + if t >= targetTimestamp { + tmpTimestamps = append(tmpTimestamps, t) + tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i]) + tmpRowData = append(tmpRowData, msg.RowData[i]) + } + } + msg.Timestamps = tmpTimestamps + msg.RowIDs = tmpRowIDs + msg.RowData = tmpRowData + return msg +} + func newFilteredDmNode() *filterDmNode { - maxQueueLength := Params.flowGraphMaxQueueLength() - maxParallelism := Params.flowGraphMaxParallelism() + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index d027f6a94b..4bf0c13103 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -38,7 +38,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - iMsg, ok := (*in[0]).(*insertMsg) + _, ok := (*in[0]).(*insertMsg) if !ok { log.Println("type assertion failed for insertMsg") // TODO: add error handling @@ -64,18 +64,13 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // log.Printf("t(%d) : %v ", task.Timestamps[0], task.RowData[0]) // } - var res Msg = &serviceTimeMsg{ - timeRange: iMsg.timeRange, - } - // TODO - return []*Msg{&res} - + return nil } func newInsertBufferNode() *insertBufferNode { - maxQueueLength := Params.flowGraphMaxQueueLength() - maxParallelism := Params.flowGraphMaxParallelism() + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism baseNode := BaseNode{} baseNode.SetMaxQueueLength(maxQueueLength) diff --git a/internal/writenode/flow_graph_message.go b/internal/writenode/flow_graph_message.go index 5b387f36eb..49be13bd28 100644 --- a/internal/writenode/flow_graph_message.go +++ b/internal/writenode/flow_graph_message.go @@ -2,7 +2,6 @@ package writenode import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -18,8 +17,17 @@ type ( timeRange TimeRange } - schemaUpdateMsg struct { - timeRange TimeRange + ddMsg struct { + // TODO: use collection id + collectionRecords map[string][]metaOperateRecord + // TODO: use partition id + partitionRecords map[string][]metaOperateRecord + timeRange TimeRange + } + + metaOperateRecord struct { + createOrDrop bool // create: true, drop: false + timestamp Timestamp } insertMsg struct { @@ -32,34 +40,6 @@ type ( deleteMessages []*msgstream.DeleteMsg timeRange TimeRange } - - serviceTimeMsg struct { - timeRange TimeRange - } - - InsertData struct { - insertIDs map[SegmentID][]UniqueID - insertTimestamps map[SegmentID][]Timestamp - insertRecords map[SegmentID][]*commonpb.Blob - insertOffset map[SegmentID]int64 - } - - DeleteData struct { - deleteIDs map[SegmentID][]UniqueID - deleteTimestamps map[SegmentID][]Timestamp - deleteOffset map[SegmentID]int64 - } - - DeleteRecord struct { - entityID UniqueID - timestamp Timestamp - segmentID UniqueID - } - - DeletePreprocessData struct { - deleteRecords []*DeleteRecord - count int32 - } ) func (ksMsg *key2SegMsg) TimeTick() Timestamp { @@ -70,11 +50,11 @@ func (ksMsg *key2SegMsg) DownStreamNodeIdx() int { return 0 } -func (suMsg *schemaUpdateMsg) TimeTick() Timestamp { +func (suMsg *ddMsg) TimeTick() Timestamp { return suMsg.timeRange.timestampMax } -func (suMsg *schemaUpdateMsg) DownStreamNodeIdx() int { +func (suMsg *ddMsg) DownStreamNodeIdx() int { return 0 } @@ -93,11 +73,3 @@ func (dMsg *deleteMsg) TimeTick() Timestamp { func (dMsg *deleteMsg) DownStreamNodeIdx() int { return 0 } - -func (stMsg *serviceTimeMsg) TimeTick() Timestamp { - return stMsg.timeRange.timestampMax -} - -func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int { - return 0 -} diff --git a/internal/writenode/flow_graph_msg_stream_input_node.go b/internal/writenode/flow_graph_msg_stream_input_node.go index 0ec6980367..f6cdf2bc36 100644 --- a/internal/writenode/flow_graph_msg_stream_input_node.go +++ b/internal/writenode/flow_graph_msg_stream_input_node.go @@ -2,23 +2,19 @@ package writenode import ( "context" - "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) func newDmInputNode(ctx context.Context) *flowgraph.InputNode { - receiveBufSize := Params.insertReceiveBufSize() - pulsarBufSize := Params.insertPulsarBufSize() + receiveBufSize := Params.InsertReceiveBufSize + pulsarBufSize := Params.InsertPulsarBufSize - msgStreamURL, err := Params.pulsarAddress() - if err != nil { - log.Fatal(err) - } + msgStreamURL := Params.PulsarAddress - consumeChannels := Params.insertChannelNames() - consumeSubName := Params.msgChannelSubName() + consumeChannels := Params.InsertChannelNames + consumeSubName := Params.MsgChannelSubName insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize) @@ -31,9 +27,32 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { var stream msgstream.MsgStream = insertStream - maxQueueLength := Params.flowGraphMaxQueueLength() - maxParallelism := Params.flowGraphMaxParallelism() + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) return node } + +func newDDInputNode(ctx context.Context) *flowgraph.InputNode { + receiveBufSize := Params.DDReceiveBufSize + pulsarBufSize := Params.DDPulsarBufSize + + msgStreamURL := Params.PulsarAddress + + consumeChannels := Params.DDChannelNames + consumeSubName := Params.MsgChannelSubName + + ddStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize) + ddStream.SetPulsarClient(msgStreamURL) + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + var stream msgstream.MsgStream = ddStream + + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism) + return node +} diff --git a/internal/writenode/flow_graph_service_time_node.go b/internal/writenode/flow_graph_service_time_node.go deleted file mode 100644 index ee6b1869a2..0000000000 --- a/internal/writenode/flow_graph_service_time_node.go +++ /dev/null @@ -1,46 +0,0 @@ -package writenode - -import ( - "log" -) - -type serviceTimeNode struct { - BaseNode -} - -func (stNode *serviceTimeNode) Name() string { - return "stNode" -} - -func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { - - if len(in) != 1 { - log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in)) - // TODO: add error handling - } - - // serviceTimeMsg, ok := (*in[0]).(*serviceTimeMsg) - _, ok := (*in[0]).(*serviceTimeMsg) - if !ok { - log.Println("type assertion failed for serviceTimeMsg") - // TODO: add error handling - } - - // update service time - // (*(*stNode.replica).getTSafe()).set(serviceTimeMsg.timeRange.timestampMax) - // fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax)) - return nil -} - -func newServiceTimeNode() *serviceTimeNode { - maxQueueLength := Params.flowGraphMaxQueueLength() - maxParallelism := Params.flowGraphMaxParallelism() - - baseNode := BaseNode{} - baseNode.SetMaxQueueLength(maxQueueLength) - baseNode.SetMaxParallelism(maxParallelism) - - return &serviceTimeNode{ - BaseNode: baseNode, - } -} diff --git a/internal/writenode/param_table.go b/internal/writenode/param_table.go index e88f5931c5..41a727bce2 100644 --- a/internal/writenode/param_table.go +++ b/internal/writenode/param_table.go @@ -10,6 +10,30 @@ import ( type ParamTable struct { paramtable.BaseTable + + PulsarAddress string + + WriteNodeID UniqueID + WriteNodeNum int + WriteNodeTimeTickChannelName string + + FlowGraphMaxQueueLength int32 + FlowGraphMaxParallelism int32 + + // dm + InsertChannelNames []string + InsertChannelRange []int + InsertReceiveBufSize int64 + InsertPulsarBufSize int64 + + // dd + DDChannelNames []string + DDReceiveBufSize int64 + DDPulsarBufSize int64 + + MsgChannelSubName string + DefaultPartitionTag string + SliceIndex int } var Params ParamTable @@ -30,18 +54,35 @@ func (p *ParamTable) Init() { writeNodeIDStr = strconv.Itoa(int(writeNodeIDList[0])) } } - p.Save("_writeNodeID", writeNodeIDStr) -} - -func (p *ParamTable) pulsarAddress() (string, error) { - url, err := p.Load("_PulsarAddress") + err = p.Save("_writeNodeID", writeNodeIDStr) if err != nil { panic(err) } - return url, nil + + p.initPulsarAddress() + + p.initWriteNodeID() + p.initWriteNodeNum() + p.initWriteNodeTimeTickChannelName() + + p.initMsgChannelSubName() + p.initDefaultPartitionTag() + p.initSliceIndex() + + p.initFlowGraphMaxQueueLength() + p.initFlowGraphMaxParallelism() + + p.initInsertChannelNames() + p.initInsertChannelRange() + p.initInsertReceiveBufSize() + p.initInsertPulsarBufSize() + + p.initDDChannelNames() + p.initDDReceiveBufSize() + p.initDDPulsarBufSize() } -func (p *ParamTable) WriteNodeID() UniqueID { +func (p *ParamTable) initWriteNodeID() { writeNodeID, err := p.Load("_writeNodeID") if err != nil { panic(err) @@ -50,187 +91,153 @@ func (p *ParamTable) WriteNodeID() UniqueID { if err != nil { panic(err) } - return UniqueID(id) + p.WriteNodeID = UniqueID(id) } -func (p *ParamTable) insertChannelRange() []int { +func (p *ParamTable) initPulsarAddress() { + url, err := p.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.PulsarAddress = url +} + +func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { panic(err) } - - return paramtable.ConvertRangeToIntRange(insertChannelRange, ",") + p.InsertChannelRange = paramtable.ConvertRangeToIntRange(insertChannelRange, ",") } // advanced params -// stats -func (p *ParamTable) statsPublishInterval() int { - return p.ParseInt("writeNode.stats.publishInterval") -} - // dataSync: -func (p *ParamTable) flowGraphMaxQueueLength() int32 { - return p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength") +func (p *ParamTable) initFlowGraphMaxQueueLength() { + p.FlowGraphMaxQueueLength = p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength") } -func (p *ParamTable) flowGraphMaxParallelism() int32 { - return p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism") +func (p *ParamTable) initFlowGraphMaxParallelism() { + p.FlowGraphMaxParallelism = p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism") } // msgStream -func (p *ParamTable) insertReceiveBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.insert.recvBufSize") +func (p *ParamTable) initInsertReceiveBufSize() { + p.InsertReceiveBufSize = p.ParseInt64("writeNode.msgStream.insert.recvBufSize") } -func (p *ParamTable) insertPulsarBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize") +func (p *ParamTable) initInsertPulsarBufSize() { + p.InsertPulsarBufSize = p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize") } -func (p *ParamTable) searchReceiveBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.search.recvBufSize") -} - -func (p *ParamTable) searchPulsarBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.search.pulsarBufSize") -} - -func (p *ParamTable) searchResultReceiveBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.searchResult.recvBufSize") -} - -func (p *ParamTable) statsReceiveBufSize() int64 { - return p.ParseInt64("writeNode.msgStream.stats.recvBufSize") -} - -func (p *ParamTable) etcdAddress() string { - etcdAddress, err := p.Load("_EtcdAddress") +func (p *ParamTable) initDDReceiveBufSize() { + revBufSize, err := p.Load("writeNode.msgStream.dataDefinition.recvBufSize") if err != nil { panic(err) } - return etcdAddress + bufSize, err := strconv.Atoi(revBufSize) + if err != nil { + panic(err) + } + p.DDReceiveBufSize = int64(bufSize) } -func (p *ParamTable) metaRootPath() string { - rootPath, err := p.Load("etcd.rootPath") +func (p *ParamTable) initDDPulsarBufSize() { + pulsarBufSize, err := p.Load("writeNode.msgStream.dataDefinition.pulsarBufSize") if err != nil { panic(err) } - subPath, err := p.Load("etcd.metaSubPath") + bufSize, err := strconv.Atoi(pulsarBufSize) if err != nil { panic(err) } - return rootPath + "/" + subPath + p.DDPulsarBufSize = int64(bufSize) } -func (p *ParamTable) gracefulTime() int64 { - gracefulTime, err := p.Load("writeNode.gracefulTime") - if err != nil { - panic(err) - } - time, err := strconv.Atoi(gracefulTime) - if err != nil { - panic(err) - } - return int64(time) -} +func (p *ParamTable) initInsertChannelNames() { -func (p *ParamTable) insertChannelNames() []string { prefix, err := p.Load("msgChannel.chanNamePrefix.insert") if err != nil { log.Fatal(err) } + prefix += "-" channelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { panic(err) } - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") var ret []string for _, ID := range channelIDs { ret = append(ret, prefix+strconv.Itoa(ID)) } - sep := len(channelIDs) / p.writeNodeNum() - index := p.sliceIndex() + sep := len(channelIDs) / p.WriteNodeNum + index := p.SliceIndex if index == -1 { panic("writeNodeID not Match with Config") } start := index * sep - return ret[start : start+sep] + p.InsertChannelNames = ret[start : start+sep] } -func (p *ParamTable) searchChannelNames() []string { - prefix, err := p.Load("msgChannel.chanNamePrefix.search") - if err != nil { - log.Fatal(err) - } - channelRange, err := p.Load("msgChannel.channelRange.search") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - return ret -} - -func (p *ParamTable) searchResultChannelNames() []string { - prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult") - if err != nil { - log.Fatal(err) - } - - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.searchResult") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - return ret -} - -func (p *ParamTable) msgChannelSubName() string { - // TODO: subName = namePrefix + "-" + writeNodeID, writeNodeID is assigned by master +func (p *ParamTable) initMsgChannelSubName() { name, err := p.Load("msgChannel.subNamePrefix.writeNodeSubNamePrefix") if err != nil { log.Panic(err) } - writeNodeIDStr, err := p.Load("_WriteNodeID") + writeNodeIDStr, err := p.Load("_writeNodeID") if err != nil { panic(err) } - return name + "-" + writeNodeIDStr + p.MsgChannelSubName = name + "-" + writeNodeIDStr } -func (p *ParamTable) writeNodeTimeTickChannelName() string { +func (p *ParamTable) initDDChannelNames() { + prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition") + if err != nil { + panic(err) + } + prefix += "-" + iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition") + if err != nil { + panic(err) + } + channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") + var ret []string + for _, ID := range channelIDs { + ret = append(ret, prefix+strconv.Itoa(ID)) + } + p.DDChannelNames = ret +} + +func (p *ParamTable) initDefaultPartitionTag() { + defaultTag, err := p.Load("common.defaultPartitionTag") + if err != nil { + panic(err) + } + + p.DefaultPartitionTag = defaultTag +} + +func (p *ParamTable) initWriteNodeTimeTickChannelName() { channels, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick") if err != nil { panic(err) } - return channels + p.WriteNodeTimeTickChannelName = channels } -func (p *ParamTable) sliceIndex() int { - writeNodeID := p.WriteNodeID() +func (p *ParamTable) initSliceIndex() { + writeNodeID := p.WriteNodeID writeNodeIDList := p.WriteNodeIDList() for i := 0; i < len(writeNodeIDList); i++ { if writeNodeID == writeNodeIDList[i] { - return i + p.SliceIndex = i + return } } - return -1 + p.SliceIndex = -1 } -func (p *ParamTable) writeNodeNum() int { - return len(p.WriteNodeIDList()) +func (p *ParamTable) initWriteNodeNum() { + p.WriteNodeNum = len(p.WriteNodeIDList()) } diff --git a/internal/writenode/param_table_test.go b/internal/writenode/param_table_test.go index 0702ec1a32..d29c1442b0 100644 --- a/internal/writenode/param_table_test.go +++ b/internal/writenode/param_table_test.go @@ -12,101 +12,58 @@ func TestParamTable_WriteNode(t *testing.T) { Params.Init() t.Run("Test PulsarAddress", func(t *testing.T) { - address, err := Params.pulsarAddress() - assert.NoError(t, err) + address := Params.PulsarAddress split := strings.Split(address, ":") assert.Equal(t, split[0], "pulsar") assert.Equal(t, split[len(split)-1], "6650") }) t.Run("Test WriteNodeID", func(t *testing.T) { - id := Params.WriteNodeID() + id := Params.WriteNodeID assert.Equal(t, id, UniqueID(3)) }) t.Run("Test insertChannelRange", func(t *testing.T) { - channelRange := Params.insertChannelRange() + channelRange := Params.InsertChannelRange assert.Equal(t, len(channelRange), 2) assert.Equal(t, channelRange[0], 0) assert.Equal(t, channelRange[1], 2) }) - t.Run("Test statsServiceTimeInterval", func(t *testing.T) { - interval := Params.statsPublishInterval() - assert.Equal(t, interval, 1000) - }) - - t.Run("Test statsMsgStreamReceiveBufSize", func(t *testing.T) { - bufSize := Params.statsReceiveBufSize() - assert.Equal(t, bufSize, int64(64)) - }) - t.Run("Test insertMsgStreamReceiveBufSize", func(t *testing.T) { - bufSize := Params.insertReceiveBufSize() + bufSize := Params.InsertReceiveBufSize assert.Equal(t, bufSize, int64(1024)) }) - t.Run("Test searchMsgStreamReceiveBufSize", func(t *testing.T) { - bufSize := Params.searchReceiveBufSize() - assert.Equal(t, bufSize, int64(512)) - }) - - t.Run("Test searchResultMsgStreamReceiveBufSize", func(t *testing.T) { - bufSize := Params.searchResultReceiveBufSize() - assert.Equal(t, bufSize, int64(64)) - }) - - t.Run("Test searchPulsarBufSize", func(t *testing.T) { - bufSize := Params.searchPulsarBufSize() - assert.Equal(t, bufSize, int64(512)) - }) - t.Run("Test insertPulsarBufSize", func(t *testing.T) { - bufSize := Params.insertPulsarBufSize() + bufSize := Params.InsertPulsarBufSize assert.Equal(t, bufSize, int64(1024)) }) t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) { - length := Params.flowGraphMaxQueueLength() + length := Params.FlowGraphMaxQueueLength assert.Equal(t, length, int32(1024)) }) t.Run("Test flowGraphMaxParallelism", func(t *testing.T) { - maxParallelism := Params.flowGraphMaxParallelism() + maxParallelism := Params.FlowGraphMaxParallelism assert.Equal(t, maxParallelism, int32(1024)) }) t.Run("Test insertChannelNames", func(t *testing.T) { - names := Params.insertChannelNames() + names := Params.InsertChannelNames assert.Equal(t, len(names), 2) - assert.Equal(t, names[0], "insert0") - assert.Equal(t, names[1], "insert1") - }) - - t.Run("Test searchChannelNames", func(t *testing.T) { - names := Params.searchChannelNames() - assert.Equal(t, len(names), 1) - assert.Equal(t, names[0], "search0") - }) - - t.Run("Test searchResultChannelName", func(t *testing.T) { - names := Params.searchResultChannelNames() - assert.Equal(t, len(names), 1) - assert.Equal(t, names[0], "searchResult-0") + assert.Equal(t, names[0], "insert-0") + assert.Equal(t, names[1], "insert-1") }) t.Run("Test msgChannelSubName", func(t *testing.T) { - name := Params.msgChannelSubName() + name := Params.MsgChannelSubName assert.Equal(t, name, "writeNode-3") }) t.Run("Test timeTickChannelName", func(t *testing.T) { - name := Params.writeNodeTimeTickChannelName() + name := Params.WriteNodeTimeTickChannelName assert.Equal(t, name, "writeNodeTimeTick") }) - - t.Run("Test metaRootPath", func(t *testing.T) { - path := Params.metaRootPath() - assert.Equal(t, path, "by-dev/meta") - }) } diff --git a/internal/writenode/write_node_test.go b/internal/writenode/write_node_test.go new file mode 100644 index 0000000000..3905b3ab30 --- /dev/null +++ b/internal/writenode/write_node_test.go @@ -0,0 +1,32 @@ +package writenode + +import ( + "fmt" + "math/rand" + "os" + "strconv" + "testing" +) + +func makeNewChannelNames(names []string, suffix string) []string { + var ret []string + for _, name := range names { + ret = append(ret, name+suffix) + } + return ret +} + +func refreshChannelNames() { + suffix := "-test-write-node" + strconv.FormatInt(rand.Int63n(100), 10) + Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix) + Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) +} + +func TestMain(m *testing.M) { + Params.Init() + refreshChannelNames() + p := Params + fmt.Println(p) + exitCode := m.Run() + os.Exit(exitCode) +}