From a8722797fed80f27c7b224bfb273c21628890a38 Mon Sep 17 00:00:00 2001 From: yukun Date: Tue, 2 Nov 2021 17:00:31 +0800 Subject: [PATCH] Add topic lock for rocksmq Seek (#11083) Signed-off-by: fishpenguin --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 4e99be4140..a24944122e 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -636,7 +636,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum } newID := consumerMessage[len(consumerMessage)-1].MsgID - err = rmq.Seek(topicName, groupName, newID) + err = rmq.seek(topicName, groupName, newID) if err != nil { log.Debug("RocksMQ: Seek(" + groupName + "," + topicName + "," + strconv.FormatInt(newID, 10) + ") failed") return nil, err @@ -648,9 +648,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return consumerMessage, nil } -// Seek updates the current id to the given msgID -func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error { - /* Step I: Check if key exists */ +func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) error { rmq.storeMu.Lock() defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) @@ -684,6 +682,23 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err return nil } +// Seek updates the current id to the given msgID +func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error { + /* Step I: Check if key exists */ + ll, ok := topicMu.Load(topicName) + if !ok { + return fmt.Errorf("topic name = %s not exist", topicName) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topicName) + } + lock.Lock() + defer lock.Unlock() + + return rmq.seek(topicName, groupName, msgID) +} + // SeekToLatest updates current id to the msg id of latest message func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { rmq.storeMu.Lock()