milvus/internal/reader/flow_graph_msg_stream_input_nodes.go
sunby 9e4ce8454e Implement segment management in master
Signed-off-by: sunby <bingyi.sun@zilliz.com>
2020-11-18 15:04:17 +08:00

33 lines
889 B
Go

package reader
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.dmMsgStreamReceiveBufSize()
pulsarBufSize := Params.dmPulsarBufSize()
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := []string{"insert"}
consumeSubName := "insertSub"
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = insertStream
node := flowgraph.NewInputNode(&stream, "dmInputNode")
return node
}