fix: Fix failed to seek to earliest position (#39965)

If it is the earliest message ID, skip the seek to prevent failure.

issue: https://github.com/milvus-io/milvus/issues/39964

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-02-19 00:30:51 +08:00 committed by GitHub
parent 52c7d7dd80
commit c917fe4782
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -100,6 +100,11 @@ func (pc *Consumer) Chan() <-chan common.Message {
// Seek seek consume position to the pointed messageID, // Seek seek consume position to the pointed messageID,
// the pointed messageID will be consumed after the seek in pulsar // the pointed messageID will be consumed after the seek in pulsar
func (pc *Consumer) Seek(id common.MessageID, inclusive bool) error { func (pc *Consumer) Seek(id common.MessageID, inclusive bool) error {
// If it is the earliest message ID, skip the seek to prevent failure.
if id.AtEarliestPosition() {
pc.hasSeek = true
return nil
}
messageID := id.(*pulsarID).messageID messageID := id.(*pulsarID).messageID
err := pc.c.Seek(messageID) err := pc.c.Seek(messageID)
if err == nil { if err == nil {