From e36fce1fd480eeed60334fb6155f2ad369332fb1 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 20 Jun 2022 15:18:13 +0800 Subject: [PATCH] Add log for mqtt msgstream seek (#17640) Signed-off-by: xiaofan-luan --- internal/mq/msgstream/mq_msgstream.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 444f9dc55e..89971801f7 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -548,13 +548,13 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { return err } - log.Debug("MsgStream begin to seek start msg: ", zap.Any("MessageID", messageID)) + log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", messageID)) err = consumer.Seek(messageID, false) if err != nil { - log.Debug("Failed to seek", zap.Error(err)) + log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) return err } - log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID)) + log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) } return nil } @@ -673,7 +673,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { // block here until addConsumer if _, ok := <-ms.syncConsumer; !ok { - log.Debug("consumer closed!") + log.Warn("consumer closed!") return } @@ -859,10 +859,13 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if err != nil { return err } + log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", seekMsgID)) err = consumer.Seek(seekMsgID, true) if err != nil { + log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) return err } + log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) return nil }