diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 80406b9c90..a721e28216 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -830,12 +830,18 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { log := log.Ctx(ms.ctx) defer ms.chanWaitGroup.Done() + msgTick := time.NewTimer(3 * time.Second) + defer msgTick.Stop() for { + msgTick.Reset(3 * time.Second) select { case <-ms.ctx.Done(): return case <-ms.chanStopChan[consumer]: return + case <-msgTick.C: + log.Info("stop consumer, because no msg received in 3s", zap.Strings("channel", ms.consumerChannels)) + return case msg, ok := <-consumer.Chan(): if !ok { log.Debug("consumer closed!")