diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index cf83cedafb..0f1fef4de9 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -126,12 +126,14 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { pos.ChannelName = ibNode.channelName startPositions = append(startPositions, pos) } + fgMsg.startPositions = startPositions endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions)) for idx := range fgMsg.endPositions { pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition) pos.ChannelName = ibNode.channelName endPositions = append(endPositions, pos) } + fgMsg.endPositions = endPositions if startPositions[0].Timestamp < ibNode.lastTimestamp { // message stream should guarantee that this should not happen