From 57c472e41fa155ce99cda69d608f1f675370ea80 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 6 May 2025 10:16:52 +0800 Subject: [PATCH] enhance: [2.5] add timeout for message reception in mqMsgStream (#41603) - pr: #41602 Signed-off-by: SimFG --- pkg/mq/msgstream/mq_msgstream.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 80406b9c90..a721e28216 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -830,12 +830,18 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { log := log.Ctx(ms.ctx) defer ms.chanWaitGroup.Done() + msgTick := time.NewTimer(3 * time.Second) + defer msgTick.Stop() for { + msgTick.Reset(3 * time.Second) select { case <-ms.ctx.Done(): return case <-ms.chanStopChan[consumer]: return + case <-msgTick.C: + log.Info("stop consumer, because no msg received in 3s", zap.Strings("channel", ms.consumerChannels)) + return case msg, ok := <-consumer.Chan(): if !ok { log.Debug("consumer closed!")