From c72f0740093b5404b26efee9e15374d7d3a3ef1a Mon Sep 17 00:00:00 2001 From: congqixia Date: Sun, 26 Sep 2021 11:07:56 +0800 Subject: [PATCH] Add waitgroup for tsafe close logic (#8545) Signed-off-by: Congqi Xia --- internal/querynode/tsafe.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 801d714ddc..0b974aed7b 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -70,6 +70,8 @@ type tSafe struct { watcherList []*tSafeWatcher tSafeChan chan tSafeMsg tSafeRecord map[UniqueID]Timestamp + // waitgroup for closing control + closeWg sync.WaitGroup } func newTSafe(ctx context.Context, channel Channel) tSafer { @@ -88,7 +90,9 @@ func newTSafe(ctx context.Context, channel Channel) tSafer { } func (ts *tSafe) start() { + ts.closeWg.Add(1) go func() { + defer ts.closeWg.Done() for { select { case <-ts.ctx.Done(): @@ -164,6 +168,8 @@ func (ts *tSafe) close() { defer ts.tSafeMu.Unlock() ts.cancel() + // wait for all job done + ts.closeWg.Wait() for _, watcher := range ts.watcherList { close(watcher.notifyChan) }