enhance: [2.5] add timeout for message reception in mqMsgStream (#41603)

- pr: #41602

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2025-05-06 10:16:52 +08:00 committed by GitHub
parent ecf3841ae9
commit 57c472e41f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -830,12 +830,18 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) {
log := log.Ctx(ms.ctx)
defer ms.chanWaitGroup.Done()
msgTick := time.NewTimer(3 * time.Second)
defer msgTick.Stop()
for {
msgTick.Reset(3 * time.Second)
select {
case <-ms.ctx.Done():
return
case <-ms.chanStopChan[consumer]:
return
case <-msgTick.C:
log.Info("stop consumer, because no msg received in 3s", zap.Strings("channel", ms.consumerChannels))
return
case msg, ok := <-consumer.Chan():
if !ok {
log.Debug("consumer closed!")