From ea6cd7d27ce603e68aa2acff95ee125f42632993 Mon Sep 17 00:00:00 2001 From: yukun Date: Tue, 2 Nov 2021 17:28:31 +0800 Subject: [PATCH] Move topic lock to the front of final delete in retention expired cleanup (#11076) Signed-off-by: fishpenguin --- .../server/rocksmq/rocksmq_retention.go | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index b161507846..ebc0c85c3c 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -155,17 +155,6 @@ func (ri *retentionInfo) Stop() { func (ri *retentionInfo) newExpiredCleanUp(topic string) error { log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) - ll, ok := topicMu.Load(topic) - if !ok { - return fmt.Errorf("topic name = %s not exist", topic) - } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topic) - } - lock.Lock() - defer lock.Unlock() - var deletedAckedSize int64 = 0 var startID UniqueID var endID UniqueID @@ -346,7 +335,25 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error { writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) } - newAckedSize := totalAckedSize - deletedAckedSize + ll, ok := topicMu.Load(topic) + if !ok { + return fmt.Errorf("topic name = %s not exist", topic) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topic) + } + lock.Lock() + defer lock.Unlock() + currentAckedSizeVal, err := ri.kv.Load(ackedSizeKey) + if err != nil { + return err + } + currentAckedSize, err := strconv.ParseInt(currentAckedSizeVal, 10, 64) + if err != nil { + return err + } + newAckedSize := currentAckedSize - deletedAckedSize writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10))) err = DeleteMessages(ri.db, topic, startID, endID)