diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index c787183f59..81cf09338f 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -154,6 +154,11 @@ func (ri *retentionInfo) Stop() { }) } +// expiredCleanUp check message retention by page: +// 1. check acked timestamp of each page id, if expired, the whole page is expired; +// 2. check acked size from the last unexpired page id; +// 3. delete acked info by range of page id; +// 4. delete message by range of page id; func (ri *retentionInfo) newExpiredCleanUp(topic string) error { log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) var deletedAckedSize int64 = 0