Make sure that closing the graph is more secure (#19178)

Signed-off-by: SimFG <bang.fu@zilliz.com>

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2022-09-14 10:32:33 +08:00 committed by GitHub
parent e3f7f3023b
commit 65fffa6d7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 0 deletions

View File

@ -27,6 +27,7 @@ type TimeTickedFlowGraph struct {
nodeCtx map[NodeName]*nodeCtx nodeCtx map[NodeName]*nodeCtx
stopOnce sync.Once stopOnce sync.Once
startOnce sync.Once startOnce sync.Once
closeWg *sync.WaitGroup
} }
// AddNode add Node into flowgraph // AddNode add Node into flowgraph
@ -35,6 +36,7 @@ func (fg *TimeTickedFlowGraph) AddNode(node Node) {
node: node, node: node,
downstreamInputChanIdx: make(map[string]int), downstreamInputChanIdx: make(map[string]int),
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
closeWg: fg.closeWg,
} }
fg.nodeCtx[node.Name()] = &nodeCtx fg.nodeCtx[node.Name()] = &nodeCtx
} }
@ -92,6 +94,7 @@ func (fg *TimeTickedFlowGraph) Close() {
v.Close() v.Close()
} }
} }
fg.closeWg.Wait()
}) })
} }
@ -99,6 +102,7 @@ func (fg *TimeTickedFlowGraph) Close() {
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
flowGraph := TimeTickedFlowGraph{ flowGraph := TimeTickedFlowGraph{
nodeCtx: make(map[string]*nodeCtx), nodeCtx: make(map[string]*nodeCtx),
closeWg: &sync.WaitGroup{},
} }
return &flowGraph return &flowGraph

View File

@ -59,12 +59,14 @@ type nodeCtx struct {
downstreamInputChanIdx map[string]int downstreamInputChanIdx map[string]int
closeCh chan struct{} // notify work to exit closeCh chan struct{} // notify work to exit
closeWg *sync.WaitGroup
} }
// Start invoke Node `Start` method and start a worker goroutine // Start invoke Node `Start` method and start a worker goroutine
func (nodeCtx *nodeCtx) Start() { func (nodeCtx *nodeCtx) Start() {
nodeCtx.node.Start() nodeCtx.node.Start()
nodeCtx.closeWg.Add(1)
go nodeCtx.work() go nodeCtx.work()
} }
@ -114,6 +116,7 @@ func (nodeCtx *nodeCtx) work() {
// the res decide whether the node should be closed. // the res decide whether the node should be closed.
if isCloseMsg(res) { if isCloseMsg(res) {
close(nodeCtx.closeCh) close(nodeCtx.closeCh)
nodeCtx.closeWg.Done()
nodeCtx.node.Close() nodeCtx.node.Close()
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"math" "math"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
@ -80,6 +81,7 @@ func TestNodeCtx_Start(t *testing.T) {
inputChannels: make([]chan Msg, 2), inputChannels: make([]chan Msg, 2),
downstreamInputChanIdx: make(map[string]int), downstreamInputChanIdx: make(map[string]int),
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
closeWg: &sync.WaitGroup{},
} }
for i := 0; i < len(node.inputChannels); i++ { for i := 0; i < len(node.inputChannels); i++ {