diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 2556098257..1a6d842bbe 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -388,15 +388,19 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if err != nil { return err } + log.Debug("MsgStream begin to seek", zap.Any("MessageID", messageID)) err = consumer.Seek(messageID) if err != nil { return err } + log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID)) if _, ok := consumer.(*mqclient.RmqConsumer); !ok { + log.Debug("MsgStream begin to read one message after seek") msg, ok := <-consumer.Chan() if !ok { return errors.New("consumer closed") } + log.Debug("MsgStream finish reading one message after seek") consumer.Ack(msg) if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) { err = fmt.Errorf("seek msg not correct")