diff --git a/configs/advanced/data_node.yaml b/configs/advanced/data_node.yaml index 621478c437..f8596a88b3 100644 --- a/configs/advanced/data_node.yaml +++ b/configs/advanced/data_node.yaml @@ -18,4 +18,6 @@ dataNode: flush: # max buffer size to flush - insertBufSize: 32000 # number of rows + insertBufSize: 32000 # GOOSE TODO: to delete + + # insertBufSize: 16 # MB GOOSE TODO: to enable diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 6466303c1d..2a2cf41cae 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -54,10 +54,12 @@ type insertBufferNode struct { BaseNode channelName string insertBuffer *insertBuffer - replica Replica - idAllocator allocatorInterface - flushMap sync.Map - flushChan <-chan *flushMsg + // insertBuffer map[UniqueID]*BufferData // SegmentID to BufferData + replica Replica + idAllocator allocatorInterface + + flushMap sync.Map + flushChan <-chan *flushMsg minIOKV kv.BaseKV @@ -81,6 +83,22 @@ type segmentFlushUnit struct { flushed bool } +type BufferData struct { + buffer *InsertData + size int64 + limit int64 // Num of rows +} + +func newBufferData(dimension int64) (*BufferData, error) { + if dimension == 0 { + return nil, errors.New("Invalid dimension") + } + + limit := Params.FlushInsertBufferSize * (1 << 18) / dimension + + return &BufferData{&InsertData{}, 0, limit}, nil +} + type insertBuffer struct { insertData map[UniqueID]*InsertData // SegmentID to InsertData maxSize int64 @@ -199,7 +217,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { // insert messages -> buffer for _, msg := range iMsg.insertMessages { - err := ibNode.bufferInsertMsg(iMsg, msg) + err := ibNode.bufferInsertMsg(msg, endPositions[0]) if err != nil { log.Warn("msg to buffer failed", zap.Error(err)) } @@ -383,7 +401,7 @@ func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream // 1.2 Get buffer data and put data into each field buffer // 1.3 Put back into buffer // 1.4 Update related statistics -func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.InsertMsg) error { +func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos *internalpb.MsgPosition) error { if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { return errors.New("misaligned messages detected") } @@ -625,13 +643,8 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream. ibNode.insertBuffer.insertData[currentSegID] = idata // store current endPositions as Segment->EndPostion - endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions)) - for idx := range iMsg.endPositions { - pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition) - pos.ChannelName = ibNode.channelName - endPositions = append(endPositions, pos) - } - ibNode.replica.updateSegmentEndPosition(currentSegID, endPositions[0]) + ibNode.replica.updateSegmentEndPosition(currentSegID, endPos) + // update segment pk filter ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs()) return nil diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 48a9a9c9f8..e900d6849b 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -692,20 +692,20 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { inMsg := genInsertMsg(insertChannelName) for _, msg := range inMsg.insertMessages { msg.EndTimestamp = 101 // ts valid - err = iBNode.bufferInsertMsg(&inMsg, msg) + err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{}) assert.Nil(t, err) } for _, msg := range inMsg.insertMessages { msg.EndTimestamp = 99 // ts invalid - err = iBNode.bufferInsertMsg(&inMsg, msg) + err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{}) assert.NotNil(t, err) } for _, msg := range inMsg.insertMessages { msg.EndTimestamp = 101 // ts valid msg.RowIDs = []int64{} //misaligned data - err = iBNode.bufferInsertMsg(&inMsg, msg) + err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{}) assert.NotNil(t, err) } } @@ -743,3 +743,39 @@ func TestInsertBufferNode_updateSegStatesInReplica(te *testing.T) { } } + +func TestInsertBufferNode_BufferData(te *testing.T) { + Params.FlushInsertBufferSize = 16 + + tests := []struct { + isValid bool + + indim int64 + expectedLimit int64 + + description string + }{ + {true, 1, 4194304, "Smallest of the DIM"}, + {true, 128, 32768, "Normal DIM"}, + {true, 32768, 128, "Largest DIM"}, + {false, 0, 0, "Illegal DIM"}, + } + + for _, test := range tests { + te.Run(test.description, func(t *testing.T) { + idata, err := newBufferData(test.indim) + + if test.isValid { + assert.NoError(t, err) + assert.NotNil(t, idata) + + assert.Equal(t, test.expectedLimit, idata.limit) + assert.Zero(t, idata.size) + } else { + assert.Error(t, err) + assert.Nil(t, idata) + } + }) + + } +}