diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 801d714ddc..0b974aed7b 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -70,6 +70,8 @@ type tSafe struct { watcherList []*tSafeWatcher tSafeChan chan tSafeMsg tSafeRecord map[UniqueID]Timestamp + // waitgroup for closing control + closeWg sync.WaitGroup } func newTSafe(ctx context.Context, channel Channel) tSafer { @@ -88,7 +90,9 @@ func newTSafe(ctx context.Context, channel Channel) tSafer { } func (ts *tSafe) start() { + ts.closeWg.Add(1) go func() { + defer ts.closeWg.Done() for { select { case <-ts.ctx.Done(): @@ -164,6 +168,8 @@ func (ts *tSafe) close() { defer ts.tSafeMu.Unlock() ts.cancel() + // wait for all job done + ts.closeWg.Wait() for _, watcher := range ts.watcherList { close(watcher.notifyChan) }