mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
add log when seek in msgstream (#6244)
Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
This commit is contained in:
parent
d7a8c9f552
commit
a518e408b1
@ -388,15 +388,19 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
log.Debug("MsgStream begin to seek", zap.Any("MessageID", messageID))
|
||||||
err = consumer.Seek(messageID)
|
err = consumer.Seek(messageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID))
|
||||||
if _, ok := consumer.(*mqclient.RmqConsumer); !ok {
|
if _, ok := consumer.(*mqclient.RmqConsumer); !ok {
|
||||||
|
log.Debug("MsgStream begin to read one message after seek")
|
||||||
msg, ok := <-consumer.Chan()
|
msg, ok := <-consumer.Chan()
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("consumer closed")
|
return errors.New("consumer closed")
|
||||||
}
|
}
|
||||||
|
log.Debug("MsgStream finish reading one message after seek")
|
||||||
consumer.Ack(msg)
|
consumer.Ack(msg)
|
||||||
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
|
if !bytes.Equal(msg.ID().Serialize(), messageID.Serialize()) {
|
||||||
err = fmt.Errorf("seek msg not correct")
|
err = fmt.Errorf("seek msg not correct")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user