diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index c86be7f5c2..d0173388f9 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -22,6 +22,9 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" ) +// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all +// messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is +// flowgraph ddNode. func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) (*flowgraph.InputNode, error) { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -33,12 +36,13 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu return nil, err } + // MsgStream needs a physical channel name, but the channel name in seek position from DataCoord + // is virtual channel name, so we need to convert vchannel name into pchannel neme here. pchannelName := rootcoord.ToPhysicalChannel(chanName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) if seekPos != nil { - // ChannelName in seek position is virtual channel name. seekPos.ChannelName = pchannelName log.Debug("datanode Seek: " + seekPos.GetChannelName()) insertStream.Seek([]*internalpb.MsgPosition{seekPos})