diff --git a/internal/datanode/pipeline/flow_graph_dd_node.go b/internal/datanode/pipeline/flow_graph_dd_node.go index 5cc8acd7b8..f5264161d2 100644 --- a/internal/datanode/pipeline/flow_graph_dd_node.go +++ b/internal/datanode/pipeline/flow_graph_dd_node.go @@ -76,7 +76,7 @@ type ddNode struct { // Name returns node name, implementing flowgraph.Node func (ddn *ddNode) Name() string { - return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName) + return fmt.Sprintf("ddNode-%s", ddn.vChannelName) } func (ddn *ddNode) IsValidInMsg(in []Msg) bool { diff --git a/internal/datanode/pipeline/flow_graph_dd_node_test.go b/internal/datanode/pipeline/flow_graph_dd_node_test.go index 6f392f9c55..ee05b72d80 100644 --- a/internal/datanode/pipeline/flow_graph_dd_node_test.go +++ b/internal/datanode/pipeline/flow_graph_dd_node_test.go @@ -83,7 +83,7 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) { require.NoError(t, err) require.NotNil(t, ddNode) - assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vChannelName), ddNode.Name()) + assert.Equal(t, fmt.Sprintf("ddNode-%s", ddNode.vChannelName), ddNode.Name()) assert.Equal(t, len(test.inSealedSegs), len(ddNode.sealedSegInfo)) assert.Equal(t, len(test.inGrowingSegs), len(ddNode.growingSegInfo)) diff --git a/internal/datanode/pipeline/flow_graph_dmstream_input_node.go b/internal/datanode/pipeline/flow_graph_dmstream_input_node.go index aa08e7b6e2..6207c8ad49 100644 --- a/internal/datanode/pipeline/flow_graph_dmstream_input_node.go +++ b/internal/datanode/pipeline/flow_graph_dmstream_input_node.go @@ -62,7 +62,7 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie log.Info("datanode consume successfully when register to msgDispatcher") } - name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName) + name := fmt.Sprintf("dmInputNode-data-%s", dmNodeConfig.vChannelName) node := flowgraph.NewInputNode( input, name, diff --git a/internal/datanode/pipeline/flow_graph_write_node.go b/internal/datanode/pipeline/flow_graph_write_node.go index 27f53617a6..504d911ccd 100644 --- a/internal/datanode/pipeline/flow_graph_write_node.go +++ b/internal/datanode/pipeline/flow_graph_write_node.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "fmt" "github.com/golang/protobuf/proto" "github.com/samber/lo" @@ -27,6 +28,11 @@ type writeNode struct { metacache metacache.MetaCache } +// Name returns node name, implementing flowgraph.Node +func (wNode *writeNode) Name() string { + return fmt.Sprintf("writeNode-%s", wNode.channelName) +} + func (wNode *writeNode) Operate(in []Msg) []Msg { fgMsg := in[0].(*FlowGraphMsg) diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index f38a65aea4..7bfa3bfaeb 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -75,27 +75,23 @@ func (nodeCtxManager *nodeCtxManager) Start() { // in dmInputNode, message from mq to channel, alloc goroutines // limit the goroutines in other node to prevent huge goroutines numbers nodeCtxManager.closeWg.Add(1) - go nodeCtxManager.workNodeStart() -} - -func (nodeCtxManager *nodeCtxManager) workNodeStart() { - defer nodeCtxManager.closeWg.Done() - inputNode := nodeCtxManager.inputNodeCtx - curNode := inputNode + curNode := nodeCtxManager.inputNodeCtx // tt checker start - var checker *timerecord.Checker if enableTtChecker { manager := timerecord.GetCheckerManger("fgNode", nodeCtxTtInterval, func(list []string) { log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval)) }) for curNode != nil { name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name()) - checker = timerecord.NewChecker(name, manager) + curNode.checker = timerecord.NewChecker(name, manager) curNode = curNode.downstream - defer checker.Close() } } + go nodeCtxManager.workNodeStart() +} +func (nodeCtxManager *nodeCtxManager) workNodeStart() { + defer nodeCtxManager.closeWg.Done() for { select { case <-nodeCtxManager.closeCh: @@ -105,7 +101,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { // 2. invoke node.Operate // 3. deliver the Operate result to downstream nodes default: - curNode = inputNode + inputNode := nodeCtxManager.inputNodeCtx + curNode := inputNode for curNode != nil { // inputs from inputsMessages for Operate var input, output []Msg @@ -137,8 +134,8 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { if curNode.downstream != nil { curNode.downstream.inputChannel <- output } - if enableTtChecker { - checker.Check() + if enableTtChecker && curNode.checker != nil { + curNode.checker.Check() } curNode = curNode.downstream } @@ -157,6 +154,7 @@ type nodeCtx struct { node Node inputChannel chan []Msg downstream *nodeCtx + checker *timerecord.Checker blockMutex sync.RWMutex } @@ -192,6 +190,9 @@ func (nodeCtx *nodeCtx) Close() { if nodeCtx.node.IsInputNode() { for nodeCtx != nil { nodeCtx.node.Close() + if nodeCtx.checker != nil { + nodeCtx.checker.Close() + } log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name())) nodeCtx = nodeCtx.downstream }