From cfd95d5b8c2e9844aa4e9a34245b73101cdaa4fe Mon Sep 17 00:00:00 2001 From: neza2017 Date: Fri, 11 Sep 2020 13:35:58 +0800 Subject: [PATCH] Check if channel of timesync is closed Signed-off-by: neza2017 --- timesync/readertimesync.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index ab4e690ac4..60f44c5b8a 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -209,7 +209,10 @@ func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncM select { case <-ctx.Done(): return nil, ctx.Err() - case cm := <-r.timeSyncConsumer.Chan(): + case cm, ok := <-r.timeSyncConsumer.Chan(): + if ok == false { + return nil, fmt.Errorf("timesync consumer closed") + } msg := cm.Message var tsm pb.TimeSyncMsg if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil { @@ -274,7 +277,11 @@ func (r *readerTimeSyncCfg) startReadTopics() { select { case <-ctx.Done(): return - case cm := <-r.readerConsumer.Chan(): + case cm, ok := <-r.readerConsumer.Chan(): + if ok == false { + //TODO,log error + log.Printf("reader consumer closed") + } msg := cm.Message var imsg pb.InsertOrDeleteMsg if err := proto.Unmarshal(msg.Payload(), &imsg); err != nil {