From 1ee17464e88e79c9b66947ecf0787f7cefdc141d Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Fri, 17 Sep 2021 12:37:50 +0800 Subject: [PATCH] Fix possible datarace in rootcoord (#8120) Signed-off-by: Xiangyu Wang --- internal/rootcoord/root_coord.go | 9 ++++++++- internal/rootcoord/timeticksync.go | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6d49351c1b..982d49b126 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -86,6 +86,7 @@ type Core struct { //inner members ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup etcdCli *clientv3.Client kvBase kv.TxnKV //*etcdkv.EtcdKV @@ -243,6 +244,7 @@ func (c *Core) checkInit() error { } func (c *Core) startTimeTickLoop() { + defer c.wg.Done() ticker := time.NewTicker(time.Duration(Params.TimeTickInterval) * time.Millisecond) for { select { @@ -260,6 +262,7 @@ func (c *Core) startTimeTickLoop() { } func (c *Core) tsLoop() { + defer c.wg.Done() tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() ctx, cancel := context.WithCancel(c.ctx) @@ -284,6 +287,7 @@ func (c *Core) tsLoop() { } func (c *Core) sessionLoop() { + defer c.wg.Done() for { select { case <-c.ctx.Done(): @@ -302,6 +306,7 @@ func (c *Core) sessionLoop() { } func (c *Core) checkFlushedSegmentsLoop() { + defer c.wg.Done() ticker := time.NewTicker(10 * time.Minute) for { select { @@ -1100,10 +1105,11 @@ func (c *Core) Start() error { log.Debug("RootCoord Start reSendDdMsg failed", zap.Error(err)) return } + c.wg.Add(5) go c.startTimeTickLoop() go c.tsLoop() go c.sessionLoop() - go c.chanTimeTick.StartWatch() + go c.chanTimeTick.StartWatch(&c.wg) go c.checkFlushedSegmentsLoop() c.stateCode.Store(internalpb.StateCode_Healthy) }) @@ -1113,6 +1119,7 @@ func (c *Core) Start() error { func (c *Core) Stop() error { c.cancel() + c.wg.Wait() c.stateCode.Store(internalpb.StateCode_Abnormal) return nil } diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 874b652ac9..8083737cff 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -221,7 +221,8 @@ func (t *timetickSync) GetProxy(sess []*sessionutil.Session) { } // StartWatch watch proxy node change and process all channels' timetick msg -func (t *timetickSync) StartWatch() { +func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { + defer wg.Done() for { select { case <-t.core.ctx.Done():