From da9228ab37bd6c62744d75aae5fb690713c51124 Mon Sep 17 00:00:00 2001 From: yukun Date: Tue, 16 Nov 2021 17:07:10 +0800 Subject: [PATCH] Add topic lock for DestroyTopic (#11786) Signed-off-by: fishpenguin --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 42 +++++++------------ .../server/rocksmq/rocksmq_impl_test.go | 1 + 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 1d55a6a664..a04254a8a9 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -273,48 +273,38 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { // DestroyTopic removes messages for topic in rocksdb func (rmq *rocksmq) DestroyTopic(topicName string) error { start := time.Now() + ll, ok := topicMu.Load(topicName) + if !ok { + return fmt.Errorf("topic name = %s not exist", topicName) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topicName) + } + lock.Lock() + defer lock.Unlock() beginKey := topicName + "/begin_id" endKey := topicName + "/end_id" - - err := rmq.kv.Remove(beginKey) - if err != nil { - log.Debug("RocksMQ: failed to remove key <" + beginKey + ">.") - return err - } - - err = rmq.kv.Remove(endKey) - if err != nil { - log.Debug("RocksMQ: failed to remove key <" + endKey + ">.") - return err - } + var removedKeys []string rmq.consumers.Delete(topicName) ackedSizeKey := AckedSizeTitle + topicName - err = rmq.kv.Remove(ackedSizeKey) - if err != nil { - return err - } topicBeginIDKey := TopicBeginIDTitle + topicName - err = rmq.kv.Remove(topicBeginIDKey) - if err != nil { - return err - } // just for clean up old topics, for new topics this is not required lastRetTsKey := LastRetTsTitle + topicName - err = rmq.kv.Remove(lastRetTsKey) - if err != nil { - return err - } msgSizeKey := MessageSizeTitle + topicName - err = rmq.kv.Remove(msgSizeKey) + + removedKeys = append(removedKeys, beginKey, endKey, ackedSizeKey, topicBeginIDKey, lastRetTsKey, msgSizeKey) + // Batch remove, atomic operation + err := rmq.kv.MultiRemove(removedKeys) if err != nil { return err } - topicMu.Delete(topicName) // clean up retention info + topicMu.Delete(topicName) rmq.retentionInfo.topics.Delete(topicName) log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index afd7196656..e7588a27c5 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -206,6 +206,7 @@ func TestRocksmq_Dummy(t *testing.T) { assert.NoError(t, err) channelName1 := "channel_dummy" + topicMu.Store(channelName1, new(sync.Mutex)) err = rmq.DestroyTopic(channelName1) assert.NoError(t, err)