diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index ae28faa69e..8584a883b8 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -60,7 +60,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { msg.SetTraceCtx(ctx) } - var iMsg = insertMsg{ + var fgMsg = flowGraphMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), timeRange: TimeRange{ timestampMin: msMsg.TimestampMin(), @@ -96,14 +96,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { continue } } - iMsg.insertMessages = append(iMsg.insertMessages, imsg) + fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) } } - iMsg.startPositions = append(iMsg.startPositions, msMsg.StartPositions()...) - iMsg.endPositions = append(iMsg.endPositions, msMsg.EndPositions()...) + fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) + fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...) - var res Msg = &iMsg + var res Msg = &fgMsg for _, sp := range spans { sp.Finish() diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index cedb7f20f7..3121fa8327 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -83,9 +83,9 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { }{ {[]Msg{}, "Invalid input length == 0"}, - {[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}}, + {[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}}, "Invalid input length == 3"}, - {[]Msg{&insertMsg{}}, + {[]Msg{&flowGraphMsg{}}, "Invalid input length == 1 but input message is not msgStreamMsg"}, } @@ -190,7 +190,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { // Test rt := ddn.Operate([]Msg{msgStreamMsg}) - assert.Equal(t, test.expectedRtLen, len(rt[0].(*insertMsg).insertMessages)) + assert.Equal(t, test.expectedRtLen, len(rt[0].(*flowGraphMsg).insertMessages)) }) } }) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 0781c815de..d23fe613d7 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -49,9 +49,9 @@ func TestFlowGraphDeleteNode_Operate(te *testing.T) { }{ {[]Msg{}, nil, "Invalid input length == 0"}, - {[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}}, nil, + {[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}}, nil, "Invalid input length == 3"}, - {[]Msg{&insertMsg{}}, nil, + {[]Msg{&flowGraphMsg{}}, nil, "Invalid input length == 1 but input message is not msgStreamMsg"}, {nil, []Msg{&MsgStreamMsg{}}, "valid input"}, diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 03ebb897b0..2a535aa1e5 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -165,7 +165,6 @@ func (ibNode *insertBufferNode) Close() { } func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { - // log.Debug("InsertBufferNode Operating") if len(in) != 1 { @@ -173,36 +172,36 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { return []Msg{} } - iMsg, ok := in[0].(*insertMsg) + fgMsg, ok := in[0].(*flowGraphMsg) if !ok { - log.Error("type assertion failed for insertMsg") + log.Error("type assertion failed for flowGraphMsg") ibNode.Close() return []Msg{} } var spans []opentracing.Span - for _, msg := range iMsg.insertMessages { + for _, msg := range fgMsg.insertMessages { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) spans = append(spans, sp) msg.SetTraceCtx(ctx) } // replace pchannel with vchannel - startPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.startPositions)) - for idx := range iMsg.startPositions { - pos := proto.Clone(iMsg.startPositions[idx]).(*internalpb.MsgPosition) + startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions)) + for idx := range fgMsg.startPositions { + pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition) pos.ChannelName = ibNode.channelName startPositions = append(startPositions, pos) } - endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions)) - for idx := range iMsg.endPositions { - pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition) + endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions)) + for idx := range fgMsg.endPositions { + pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition) pos.ChannelName = ibNode.channelName endPositions = append(endPositions, pos) } // Updating segment statistics in replica - seg2Upload, err := ibNode.updateSegStatesInReplica(iMsg.insertMessages, startPositions[0], endPositions[0]) + seg2Upload, err := ibNode.updateSegStatesInReplica(fgMsg.insertMessages, startPositions[0], endPositions[0]) if err != nil { log.Warn("update segment states in Replica wrong", zap.Error(err)) return []Msg{} @@ -216,7 +215,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } // insert messages -> buffer - for _, msg := range iMsg.insertMessages { + for _, msg := range fgMsg.insertMessages { err := ibNode.bufferInsertMsg(msg, endPositions[0]) if err != nil { log.Warn("msg to buffer failed", zap.Error(err)) @@ -224,7 +223,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } // TODO GOOSE: log updated segments' states - if len(iMsg.insertMessages) > 0 { + if len(fgMsg.insertMessages) > 0 { log.Debug("---insert buffer status---") var stopSign int = 0 for k := range ibNode.insertBuffer.insertData { @@ -246,7 +245,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { log.Debug(". Insert Buffer full, auto flushing ", zap.Int64("num of rows", ibNode.insertBuffer.size(segToFlush))) - collMeta, err := ibNode.getCollMetabySegID(segToFlush, iMsg.timeRange.timestampMax) + collMeta, err := ibNode.getCollMetabySegID(segToFlush, fgMsg.timeRange.timestampMax) if err != nil { log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err)) continue @@ -320,7 +319,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { // TODO add error handling } - collMeta, err := ibNode.getCollMetabySegID(currentSegID, iMsg.timeRange.timestampMax) + collMeta, err := ibNode.getCollMetabySegID(currentSegID, fgMsg.timeRange.timestampMax) if err != nil { log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err)) clearFn() @@ -347,7 +346,7 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { default: } - if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { + if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax); err != nil { log.Error("send hard time tick into pulsar channel failed", zap.Error(err)) } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 5f43bb2599..bf730614f9 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -123,10 +123,10 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { }{ {[]Msg{}, "Invalid input length == 0"}, - {[]Msg{&insertMsg{}, &insertMsg{}, &insertMsg{}}, + {[]Msg{&flowGraphMsg{}, &flowGraphMsg{}, &flowGraphMsg{}}, "Invalid input length == 3"}, {[]Msg{&mockMsg{}}, - "Invalid input length == 1 but input message is not insertMsg"}, + "Invalid input length == 1 but input message is not flowGraphMsg"}, } for _, test := range invalidInTests { @@ -181,12 +181,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { collectionID: UniqueID(1), } - inMsg := genInsertMsg(insertChannelName) - var iMsg flowgraph.Msg = &inMsg - iBNode.Operate([]flowgraph.Msg{iMsg}) + inMsg := genFlowGraphMsg(insertChannelName) + var fgMsg flowgraph.Msg = &inMsg + iBNode.Operate([]flowgraph.Msg{fgMsg}) } -func genInsertMsg(insertChannelName string) insertMsg { +func genFlowGraphMsg(insertChannelName string) flowGraphMsg { timeRange := TimeRange{ timestampMin: 0, @@ -201,7 +201,7 @@ func genInsertMsg(insertChannelName string) insertMsg { }, } - var iMsg = &insertMsg{ + var iMsg = &flowGraphMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), timeRange: TimeRange{ timestampMin: timeRange.timestampMin, @@ -391,7 +391,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { // Auto flush number of rows set to 2 - inMsg := genInsertMsg("datanode-03-test-autoflush") + inMsg := genFlowGraphMsg("datanode-03-test-autoflush") inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2) var iMsg flowgraph.Msg = &inMsg @@ -667,7 +667,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { iBNode, err := newInsertBufferNode(ctx, replica, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string", newCache()) require.NoError(t, err) - inMsg := genInsertMsg(insertChannelName) + inMsg := genFlowGraphMsg(insertChannelName) for _, msg := range inMsg.insertMessages { msg.EndTimestamp = 101 // ts valid err = iBNode.bufferInsertMsg(msg, &internalpb.MsgPosition{}) diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index a5b6c16e39..eb4b1078df 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -25,20 +25,20 @@ type ( MsgStreamMsg = flowgraph.MsgStreamMsg ) -type insertMsg struct { +type flowGraphMsg struct { insertMessages []*msgstream.InsertMsg timeRange TimeRange startPositions []*internalpb.MsgPosition endPositions []*internalpb.MsgPosition } +func (fgMsg *flowGraphMsg) TimeTick() Timestamp { + return fgMsg.timeRange.timestampMax +} + type flushMsg struct { msgID UniqueID timestamp Timestamp segmentID UniqueID collectionID UniqueID } - -func (iMsg *insertMsg) TimeTick() Timestamp { - return iMsg.timeRange.timestampMax -} diff --git a/internal/datanode/flow_graph_message_test.go b/internal/datanode/flow_graph_message_test.go index d0488a7a33..1a6c71ccb4 100644 --- a/internal/datanode/flow_graph_message_test.go +++ b/internal/datanode/flow_graph_message_test.go @@ -29,9 +29,8 @@ func TestInsertMsg_TimeTick(te *testing.T) { for _, test := range tests { te.Run(test.description, func(t *testing.T) { - im := &insertMsg{timeRange: TimeRange{timestampMax: test.timeTimestanpMax}} - - assert.Equal(t, test.timeTimestanpMax, im.TimeTick()) + fgMsg := &flowGraphMsg{timeRange: TimeRange{timestampMax: test.timeTimestanpMax}} + assert.Equal(t, test.timeTimestanpMax, fgMsg.TimeTick()) }) }