diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 3c355185a5..49fb425f5f 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -572,6 +572,10 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes return nil } +// Consume steps: +// 1. Consume n messages from rocksdb +// 2. Update current_id to the last consumed message +// 3. Update ack informations in rocksdb func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) { ll, ok := topicMu.Load(topicName) if !ok {