diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go index 5350c84d16..7d4dac2ebe 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go @@ -100,6 +100,11 @@ func (pc *Consumer) Chan() <-chan common.Message { // Seek seek consume position to the pointed messageID, // the pointed messageID will be consumed after the seek in pulsar func (pc *Consumer) Seek(id common.MessageID, inclusive bool) error { + // If it is the earliest message ID, skip the seek to prevent failure. + if id.AtEarliestPosition() { + pc.hasSeek = true + return nil + } messageID := id.(*pulsarID).messageID err := pc.c.Seek(messageID) if err == nil {