From 8465d03dec9811b346341b1542014be04aa85297 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Sun, 26 Sep 2021 21:20:10 +0800 Subject: [PATCH] Make tsafe close not elegant (#8582) Signed-off-by: xiaofan-luan --- internal/querynode/tsafe.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 32c12a6fa1..53347188f6 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -34,7 +34,6 @@ func newTSafeWatcher() *tSafeWatcher { func (watcher *tSafeWatcher) notify() { if len(watcher.notifyChan) == 0 { watcher.notifyChan <- true - //log.Debug("tSafe watcher notify done") } } @@ -70,8 +69,7 @@ type tSafe struct { watcherList []*tSafeWatcher tSafeChan chan tSafeMsg tSafeRecord map[UniqueID]Timestamp - // waitgroup for closing control - closeWg sync.WaitGroup + isClose bool } func newTSafe(ctx context.Context, channel Channel) tSafer { @@ -90,17 +88,26 @@ func newTSafe(ctx context.Context, channel Channel) tSafer { } func (ts *tSafe) start() { - ts.closeWg.Add(1) go func() { - defer ts.closeWg.Done() for { select { case <-ts.ctx.Done(): + ts.tSafeMu.Lock() + ts.isClose = true log.Debug("tSafe context done", zap.Any("channel", ts.channel), ) + for _, watcher := range ts.watcherList { + close(watcher.notifyChan) + } + close(ts.tSafeChan) + ts.tSafeMu.Unlock() return - case m := <-ts.tSafeChan: + case m, ok := <-ts.tSafeChan: + if !ok { + // should not happen!! + return + } ts.tSafeMu.Lock() ts.tSafeRecord[m.id] = m.t var tmpT Timestamp = math.MaxUint64 @@ -155,7 +162,13 @@ func (ts *tSafe) get() Timestamp { func (ts *tSafe) set(id UniqueID, t Timestamp) { ts.tSafeMu.Lock() defer ts.tSafeMu.Unlock() - + if ts.isClose { + // should not happen if tsafe_replica guard correctly + log.Warn("Try to set id with ts close ", + zap.Any("channel", ts.channel), + zap.Any("it", id)) + return + } msg := tSafeMsg{ t: t, id: id, @@ -165,13 +178,4 @@ func (ts *tSafe) set(id UniqueID, t Timestamp) { func (ts *tSafe) close() { ts.cancel() - // wait for all job done - ts.closeWg.Wait() - - ts.tSafeMu.Lock() - defer ts.tSafeMu.Unlock() - - for _, watcher := range ts.watcherList { - close(watcher.notifyChan) - } }