From 97682a413f0a6e497eda5dd68cc6083393b080e2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 17 Feb 2023 10:12:35 +0800 Subject: [PATCH] Convert msg pchannel to vchannel before check IsCloseMsg (#22182) (#22197) Signed-off-by: Congqi Xia --- .../datanode/flow_graph_insert_buffer_node.go | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index f88632936d..12b7debd82 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -117,6 +117,23 @@ func (ibNode *insertBufferNode) IsValidInMsg(in []Msg) bool { func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { fgMsg := in[0].(*flowGraphMsg) + + // replace pchannel with vchannel + startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions)) + for idx := range fgMsg.startPositions { + pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition) + pos.ChannelName = ibNode.channelName + startPositions = append(startPositions, pos) + } + fgMsg.startPositions = startPositions + endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions)) + for idx := range fgMsg.endPositions { + pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition) + pos.ChannelName = ibNode.channelName + endPositions = append(endPositions, pos) + } + fgMsg.endPositions = endPositions + if fgMsg.IsCloseMsg() { if len(fgMsg.endPositions) != 0 { // try to sync all segments @@ -152,22 +169,6 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { } }() - // replace pchannel with vchannel - startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions)) - for idx := range fgMsg.startPositions { - pos := proto.Clone(fgMsg.startPositions[idx]).(*internalpb.MsgPosition) - pos.ChannelName = ibNode.channelName - startPositions = append(startPositions, pos) - } - fgMsg.startPositions = startPositions - endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions)) - for idx := range fgMsg.endPositions { - pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition) - pos.ChannelName = ibNode.channelName - endPositions = append(endPositions, pos) - } - fgMsg.endPositions = endPositions - if startPositions[0].Timestamp < ibNode.lastTimestamp { // message stream should guarantee that this should not happen err := fmt.Errorf("insert buffer node consumed old messages, channel = %s, timestamp = %d, lastTimestamp = %d",