Add ConsumeAfterSeek interface (#11818)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-15 18:17:10 +08:00 committed by GitHub
parent ecae18ad40
commit 304bbd3e71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 3 deletions

View File

@ -529,7 +529,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
return err return err
} }
log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID)) log.Debug("MsgStream seek finished", zap.Any("MessageID", messageID))
if _, ok := consumer.(*mqclient.PulsarConsumer); ok { if consumer.ConsumeAfterSeek() {
log.Debug("MsgStream start to pop one message after seek") log.Debug("MsgStream start to pop one message after seek")
msg, ok := <-consumer.Chan() msg, ok := <-consumer.Chan()
if !ok { if !ok {
@ -885,9 +885,9 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
// rmq seek behavior (position, ...) // rmq seek behavior (position, ...)
// pulsar seek behavior [position, ...) // pulsar seek behavior [position, ...)
// skip one tt for pulsar // skip one tt for pulsar
_, ok := consumer.(*mqclient.RmqConsumer)
runLoop := false runLoop := false
if !ok { if consumer.ConsumeAfterSeek() {
runLoop = true runLoop = true
} }
for runLoop { for runLoop {

View File

@ -85,6 +85,9 @@ type Consumer interface {
// Make sure that msg is received. Only used in pulsar // Make sure that msg is received. Only used in pulsar
Ack(ConsumerMessage) Ack(ConsumerMessage)
// ConsumeAfterSeek defines the behavior whether to consume after seeking is done
ConsumeAfterSeek() bool
// Close consumer // Close consumer
Close() Close()
} }

View File

@ -80,6 +80,11 @@ func (pc *PulsarConsumer) Seek(id MessageID) error {
return err return err
} }
// ConsumeAfterSeek defines pulsar consumer SHOULD consume after seek
func (pc *PulsarConsumer) ConsumeAfterSeek() bool {
return true
}
func (pc *PulsarConsumer) Ack(message ConsumerMessage) { func (pc *PulsarConsumer) Ack(message ConsumerMessage) {
pm := message.(*pulsarMessage) pm := message.(*pulsarMessage)
pc.c.Ack(pm.msg) pc.c.Ack(pm.msg)

View File

@ -61,6 +61,11 @@ func (rc *RmqConsumer) Seek(id MessageID) error {
return rc.c.Seek(msgID) return rc.c.Seek(msgID)
} }
// ConsumeAfterSeek defines rmq consumer should NOT consume after seek
func (rc *RmqConsumer) ConsumeAfterSeek() bool {
return false
}
// Ack is used to ask a rocksmq message // Ack is used to ask a rocksmq message
func (rc *RmqConsumer) Ack(message ConsumerMessage) { func (rc *RmqConsumer) Ack(message ConsumerMessage) {
} }