From ffd05eb140f29abec5181d1a9b1cb51707227812 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 3 Dec 2021 16:39:33 +0800 Subject: [PATCH] Fix deadlock in mergedTimeTickSender (#12697) Signed-off-by: Congqi Xia --- internal/datanode/flow_graph_time_ticker.go | 25 ++++++++++++----- .../datanode/flow_graph_time_ticker_test.go | 27 +++++++++++++++++++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 018a19ba99..bf9c629676 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -61,6 +61,8 @@ func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp) { defer mt.lastMut.RUnlock() if !mt.lastSent.IsZero() && time.Since(mt.lastSent) > time.Millisecond*100 { + mt.cond.L.Lock() + defer mt.cond.L.Unlock() mt.cond.Signal() } } @@ -72,24 +74,33 @@ func (mt *mergedTimeTickerSender) tick() { for { select { case <-t: + mt.cond.L.Lock() mt.cond.Signal() // allow worker to check every 0.1s + mt.cond.L.Unlock() case <-mt.closeCh: return } } } +func (mt *mergedTimeTickerSender) isClosed() bool { + select { + case <-mt.closeCh: + return true + default: + return false + } +} + func (mt *mergedTimeTickerSender) work() { defer mt.wg.Done() ts, lastTs := uint64(0), uint64(0) for { - select { - case <-mt.closeCh: - return - default: - } - mt.cond.L.Lock() + if mt.isClosed() { + mt.cond.L.Unlock() + return + } mt.cond.Wait() ts = mt.ts.Load() mt.cond.L.Unlock() @@ -105,8 +116,10 @@ func (mt *mergedTimeTickerSender) work() { func (mt *mergedTimeTickerSender) close() { mt.closeOnce.Do(func() { + mt.cond.L.Lock() close(mt.closeCh) mt.cond.Broadcast() + mt.cond.L.Unlock() mt.wg.Wait() }) } diff --git a/internal/datanode/flow_graph_time_ticker_test.go b/internal/datanode/flow_graph_time_ticker_test.go index e344f18051..ce54e87f2c 100644 --- a/internal/datanode/flow_graph_time_ticker_test.go +++ b/internal/datanode/flow_graph_time_ticker_test.go @@ -29,3 +29,30 @@ func TestMergedTimeTicker(t *testing.T) { assert.Less(t, len(ticks), 20) mut.Unlock() } + +func TestMergedTimeTicker_close10000(t *testing.T) { + var wg sync.WaitGroup + batchSize := 10000 + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + mt := newMergedTimeTickerSender(func(ts Timestamp) error { + return nil + }) + go func(mt *mergedTimeTickerSender) { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + mt.close() + }(mt) + } + tm := time.NewTimer(time.Millisecond * 20) + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-tm.C: + t.FailNow() + case <-done: + } +}