From 78e8e4aa22b605164c31364e0219cc62416d6c25 Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 23 Sep 2021 16:59:54 +0800 Subject: [PATCH] Add rocksmq comments (#8362) Signed-off-by: fishpenguin --- internal/util/rocksmq/server/rocksmq/rocksmq.go | 5 +++++ internal/util/rocksmq/server/rocksmq/rocksmq_retention.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index 6831a69cd5..a65d8a4c2a 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -11,21 +11,26 @@ package rocksmq +// ProducerMessage that will be write to rocksdb type ProducerMessage struct { Payload []byte } +// Rocksmq consumer type Consumer struct { Topic string GroupName string MsgMutex chan struct{} } +// ConsumerMessage that consumed from rocksdb type ConsumerMessage struct { MsgID UniqueID Payload []byte } +// Rocksmq is an interface thatmay be implemented by the application +// to do message queue operations based ion rocksdb type RocksMQ interface { CreateTopic(topicName string) error DestroyTopic(topicName string) error diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index ab45517a6b..ba1381b037 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -27,10 +27,16 @@ import ( "go.uber.org/zap" ) +// RocksmqRetentionTimeInMinutes is the time of retention var RocksmqRetentionTimeInMinutes int64 + +// RocksmqRetentionSizeInMB is the size of retention var RocksmqRetentionSizeInMB int64 + +// TickerTimeInMinutes is the time of expired check var TickerTimeInMinutes int64 = 1 +// Const value that used to convert unit const ( MB = 2 << 20 MINUTE = 60 @@ -477,6 +483,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { return DeleteMessages(ri.db, topic, startID, endID) } +// Delte messages in rocksdb by range of [startID, endID) func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error { // Delete msg by range of startID and endID startKey, err := combKey(topic, startID)