diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 43f10c6fa0..852703fe35 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -79,12 +79,9 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []stri // Start starts all nodes in timetick flowgragh func (fg *TimeTickedFlowGraph) Start() { fg.startOnce.Do(func() { - wg := sync.WaitGroup{} for _, v := range fg.nodeCtx { - wg.Add(1) - v.Start(&wg) + v.Start() } - wg.Wait() }) } @@ -92,8 +89,16 @@ func (fg *TimeTickedFlowGraph) Start() { func (fg *TimeTickedFlowGraph) Close() { fg.stopOnce.Do(func() { for _, v := range fg.nodeCtx { - // maybe need to stop in order - v.Close() + if v.node.IsInputNode() { + // close inputNode first + v.Close() + } + } + for _, v := range fg.nodeCtx { + if !v.node.IsInputNode() { + // close other nodes + v.Close() + } } }) } diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 6be3335363..861e163d4d 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -64,7 +64,7 @@ func (inNode *InputNode) InStream() msgstream.MsgStream { func (inNode *InputNode) Operate(in []Msg) []Msg { msgPack, ok := <-inNode.inStream.Chan() if !ok { - log.Warn("Receive Msg failed from upstream node", zap.Any("input node", inNode.Name())) + log.Warn("MsgStream closed", zap.Any("input node", inNode.Name())) return []Msg{} } diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 9fc860b10e..da95c3d00a 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -58,15 +58,16 @@ type nodeCtx struct { downstream []*nodeCtx downstreamInputChanIdx map[string]int - closeCh chan struct{} + closeCh chan struct{} // notify work to exit + closeWg sync.WaitGroup // block Close until work exit } // Start invoke Node `Start` method and start a worker goroutine -func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) { +func (nodeCtx *nodeCtx) Start() { nodeCtx.node.Start() + nodeCtx.closeWg.Add(1) go nodeCtx.work() - wg.Done() } // work handles node work spinning @@ -74,6 +75,7 @@ func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) { // 2. invoke node.Operate // 3. deliver the Operate result to downstream nodes func (nodeCtx *nodeCtx) work() { + defer nodeCtx.closeWg.Done() name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name()) var checker *timerecord.GroupChecker if enableTtChecker { @@ -87,10 +89,10 @@ func (nodeCtx *nodeCtx) work() { for { select { case <-nodeCtx.closeCh: + log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name())) return default: // inputs from inputsMessages for Operate - var inputs, res []Msg if !nodeCtx.node.IsInputNode() { nodeCtx.collectInputMessages() @@ -124,10 +126,15 @@ func (nodeCtx *nodeCtx) work() { // Close handles cleanup logic and notify worker to quit func (nodeCtx *nodeCtx) Close() { - // close Node - nodeCtx.node.Close() - // notify worker - close(nodeCtx.closeCh) + if nodeCtx.node.IsInputNode() { + nodeCtx.node.Close() // close input msgStream + close(nodeCtx.closeCh) + nodeCtx.closeWg.Wait() + } else { + close(nodeCtx.closeCh) + nodeCtx.closeWg.Wait() + nodeCtx.node.Close() // close output msgStream, and etc... + } } // deliverMsg tries to put the Msg to specified downstream channel diff --git a/internal/util/flowgraph/node_test.go b/internal/util/flowgraph/node_test.go index 1124ad2b36..1fcf2b81cc 100644 --- a/internal/util/flowgraph/node_test.go +++ b/internal/util/flowgraph/node_test.go @@ -20,7 +20,6 @@ import ( "context" "math" "os" - "sync" "testing" "time" @@ -90,9 +89,9 @@ func TestNodeCtx_Start(t *testing.T) { node.inputChannels[i] = make(chan Msg) } - var waitGroup sync.WaitGroup - waitGroup.Add(1) - node.Start(&waitGroup) + assert.NotPanics(t, func() { + node.Start() + }) node.Close() }