From 304bbd3e71ec09dee4e744af33d2d246ada3c62b Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 15 Nov 2021 18:17:10 +0800 Subject: [PATCH] Add ConsumeAfterSeek interface (#11818) Signed-off-by: Congqi Xia --- internal/msgstream/mq_msgstream.go | 6 +++--- internal/util/mqclient/consumer.go | 3 +++ internal/util/mqclient/pulsar_consumer.go | 5 +++++ internal/util/mqclient/rmq_consumer.go | 5 +++++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 2546462268..586f54f229 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -529,7 +529,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { return err } log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID)) - if _, ok := consumer.(*mqclient.PulsarConsumer); ok { + if consumer.ConsumeAfterSeek() { log.Debug("MsgStream start to pop one message after seek") msg, ok := <-consumer.Chan() if !ok { @@ -885,9 +885,9 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { // rmq seek behavior (position, ...) // pulsar seek behavior [position, ...) // skip one tt for pulsar - _, ok := consumer.(*mqclient.RmqConsumer) + runLoop := false - if !ok { + if consumer.ConsumeAfterSeek() { runLoop = true } for runLoop { diff --git a/internal/util/mqclient/consumer.go b/internal/util/mqclient/consumer.go index 2c5a176764..26315d88cf 100644 --- a/internal/util/mqclient/consumer.go +++ b/internal/util/mqclient/consumer.go @@ -85,6 +85,9 @@ type Consumer interface { // Make sure that msg is received. Only used in pulsar Ack(ConsumerMessage) + // ConsumeAfterSeek defines the behavior whether to consume after seeking is done + ConsumeAfterSeek() bool + // Close consumer Close() } diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index 3630a33911..1374f805d6 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -80,6 +80,11 @@ func (pc *PulsarConsumer) Seek(id MessageID) error { return err } +// ConsumeAfterSeek defines pulsar consumer SHOULD consume after seek +func (pc *PulsarConsumer) ConsumeAfterSeek() bool { + return true +} + func (pc *PulsarConsumer) Ack(message ConsumerMessage) { pm := message.(*pulsarMessage) pc.c.Ack(pm.msg) diff --git a/internal/util/mqclient/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go index 9af9a025dc..bb05d643cd 100644 --- a/internal/util/mqclient/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -61,6 +61,11 @@ func (rc *RmqConsumer) Seek(id MessageID) error { return rc.c.Seek(msgID) } +// ConsumeAfterSeek defines rmq consumer should NOT consume after seek +func (rc *RmqConsumer) ConsumeAfterSeek() bool { + return false +} + // Ack is used to ask a rocksmq message func (rc *RmqConsumer) Ack(message ConsumerMessage) { }