From 5bb832e6cc4bc69c2d5b8ef64e4daa7bfcca7a7a Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 6 Oct 2021 23:02:08 +0800 Subject: [PATCH] [skip ci]Add comment on flowgraph dmstream inputnode (#9359) Signed-off-by: Yang Xuan --- internal/datanode/flow_graph_dmstream_input_node.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index c86be7f5c2..d0173388f9 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -22,6 +22,9 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) +// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all +// messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is +// flowgraph ddNode. func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) (*flowgraph.InputNode, error) { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -33,12 +36,13 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu return nil, err } + // MsgStream needs a physical channel name, but the channel name in seek position from DataCoord + // is virtual channel name, so we need to convert vchannel name into pchannel neme here. pchannelName := rootcoord.ToPhysicalChannel(chanName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) if seekPos != nil { - // ChannelName in seek position is virtual channel name. seekPos.ChannelName = pchannelName log.Debug("datanode Seek: " + seekPos.GetChannelName()) insertStream.Seek([]*internalpb.MsgPosition{seekPos})