diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index ab4e690ac4..60f44c5b8a 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -209,7 +209,10 @@ func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncM select { case <-ctx.Done(): return nil, ctx.Err() - case cm := <-r.timeSyncConsumer.Chan(): + case cm, ok := <-r.timeSyncConsumer.Chan(): + if ok == false { + return nil, fmt.Errorf("timesync consumer closed") + } msg := cm.Message var tsm pb.TimeSyncMsg if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil { @@ -274,7 +277,11 @@ func (r *readerTimeSyncCfg) startReadTopics() { select { case <-ctx.Done(): return - case cm := <-r.readerConsumer.Chan(): + case cm, ok := <-r.readerConsumer.Chan(): + if ok == false { + //TODO,log error + log.Printf("reader consumer closed") + } msg := cm.Message var imsg pb.InsertOrDeleteMsg if err := proto.Unmarshal(msg.Payload(), &imsg); err != nil {