mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
[skip ci]Add comment on flowgraph dmstream inputnode (#9359)
Signed-off-by: Yang Xuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
27143c6095
commit
5bb832e6cc
@ -22,6 +22,9 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
)
|
||||
|
||||
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
|
||||
// messages between two timeticks to the following flowgraph node. In DataNode, the following flow graph node is
|
||||
// flowgraph ddNode.
|
||||
func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) (*flowgraph.InputNode, error) {
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.FlowGraphMaxParallelism
|
||||
@ -33,12 +36,13 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID Uniqu
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// MsgStream needs a physical channel name, but the channel name in seek position from DataCoord
|
||||
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
|
||||
pchannelName := rootcoord.ToPhysicalChannel(chanName)
|
||||
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
|
||||
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)
|
||||
|
||||
if seekPos != nil {
|
||||
// ChannelName in seek position is virtual channel name.
|
||||
seekPos.ChannelName = pchannelName
|
||||
log.Debug("datanode Seek: " + seekPos.GetChannelName())
|
||||
insertStream.Seek([]*internalpb.MsgPosition{seekPos})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user