Add topic lock for rocksmq Seek (#11083)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
yukun 2021-11-02 17:00:31 +08:00 committed by GitHub
parent ca56290e85
commit a8722797fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -636,7 +636,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
}
newID := consumerMessage[len(consumerMessage)-1].MsgID
err = rmq.Seek(topicName, groupName, newID)
err = rmq.seek(topicName, groupName, newID)
if err != nil {
log.Debug("RocksMQ: Seek(" + groupName + "," + topicName + "," + strconv.FormatInt(newID, 10) + ") failed")
return nil, err
@ -648,9 +648,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
return consumerMessage, nil
}
// Seek updates the current id to the given msgID
func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error {
/* Step I: Check if key exists */
func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) error {
rmq.storeMu.Lock()
defer rmq.storeMu.Unlock()
key := constructCurrentID(topicName, groupName)
@ -684,6 +682,23 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
return nil
}
// Seek updates the current id to the given msgID
func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error {
/* Step I: Check if key exists */
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
return rmq.seek(topicName, groupName, msgID)
}
// SeekToLatest updates current id to the msg id of latest message
func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
rmq.storeMu.Lock()