From 9c53375bbab7fd5d0997f8ab8da0f70d9e83c072 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 3 Nov 2021 17:11:47 +0800 Subject: [PATCH] Fix MqttMsgStream skip current msg logic (#11171) Signed-off-by: Congqi Xia --- internal/msgstream/mq_msgstream.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index db18306690..99f9872d4d 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -875,7 +875,14 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { } ms.addConsumer(consumer, mp.ChannelName) - runLoop := true + // rmq seek behavior (position, ...) + // pulsar seek behavior [position, ...) + // skip one tt for pulsar + _, ok := consumer.(*mqclient.RmqConsumer) + runLoop := false + if !ok { + runLoop = true + } for runLoop { select { case <-ms.ctx.Done():