diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go index 414a4a977b..2020cab06a 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go @@ -124,9 +124,9 @@ func TestRmqClient_GetLatestMsg(t *testing.T) { for { select { case <-ctx.Done(): - ret, err := actualLastMsg.ID().LessOrEqualThan(expectLastMsg.Serialize()) + ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize()) assert.Nil(t, err) - assert.False(t, ret) + assert.True(t, ret) return case msg := <-consumer.Chan(): consumer.Ack(msg) diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_id.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_id.go index 4cac4e6a0c..79130aa1d3 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_id.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_id.go @@ -41,7 +41,7 @@ func (rid *rmqID) AtEarliestPosition() bool { func (rid *rmqID) LessOrEqualThan(msgID []byte) (bool, error) { rMsgID := DeserializeRmqID(msgID) - return rid.messageID < rMsgID, nil + return rid.messageID <= rMsgID, nil } func (rid *rmqID) Equal(msgID []byte) (bool, error) { diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_id_test.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_id_test.go index 9fb67ff4fd..a8628cef91 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_id_test.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_id_test.go @@ -60,6 +60,10 @@ func TestLessOrEqualThan(t *testing.T) { ret, err = rid2.LessOrEqualThan(rid1.Serialize()) assert.Nil(t, err) assert.False(t, ret) + + ret, err = rid1.LessOrEqualThan(rid1.Serialize()) + assert.Nil(t, err) + assert.True(t, ret) } func Test_Equal(t *testing.T) {