Fix MqttMsgStream skip current msg logic (#11171)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-03 17:11:47 +08:00 committed by GitHub
parent da56a7215a
commit 9c53375bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -875,7 +875,14 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
}
ms.addConsumer(consumer, mp.ChannelName)
runLoop := true
// rmq seek behavior (position, ...)
// pulsar seek behavior [position, ...)
// skip one tt for pulsar
_, ok := consumer.(*mqclient.RmqConsumer)
runLoop := false
if !ok {
runLoop = true
}
for runLoop {
select {
case <-ms.ctx.Done():