mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: unsafe concurrent consuming api of rocksmq (#39544)
issue: #38966 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
b3791a6f90
commit
a816a03351
@ -165,7 +165,7 @@ func TestDmlChannels(t *testing.T) {
|
||||
defer paramtable.Get().Reset(Params.CommonCfg.PreCreatedTopicEnabled.Key)
|
||||
defer paramtable.Get().Reset(Params.CommonCfg.TopicNames.Key)
|
||||
|
||||
assert.Panics(t, func() { newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum) })
|
||||
newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum)
|
||||
}
|
||||
|
||||
func TestDmChannelsFailure(t *testing.T) {
|
||||
|
||||
@ -111,7 +111,9 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
||||
GroupName: consumer.consumerName,
|
||||
MsgMutex: consumer.msgMutex,
|
||||
}
|
||||
c.server.RegisterConsumer(cons)
|
||||
if err := c.server.RegisterConsumer(cons); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if options.SubscriptionInitialPosition == common.SubscriptionPositionLatest {
|
||||
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
|
||||
|
||||
@ -420,9 +420,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, ok := topicMu.Load(topicName); !ok {
|
||||
topicMu.Store(topicName, new(sync.Mutex))
|
||||
}
|
||||
topicMu.LoadOrStore(topicName, new(sync.Mutex))
|
||||
|
||||
// msgSizeKey -> msgSize
|
||||
// topicIDKey -> topic creating time
|
||||
@ -550,6 +548,11 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
|
||||
if rmq.isClosed() {
|
||||
return errors.New(RmqNotServingErrMsg)
|
||||
}
|
||||
ll, _ := topicMu.LoadOrStore(consumer.Topic, new(sync.Mutex))
|
||||
mu, _ := ll.(*sync.Mutex)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
|
||||
@ -63,7 +63,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
|
||||
for _, key := range topicKeys {
|
||||
topic := key[len(TopicIDTitle):]
|
||||
ri.topicRetetionTime.Insert(topic, time.Now().Unix())
|
||||
topicMu.Store(topic, new(sync.Mutex))
|
||||
topicMu.LoadOrStore(topic, new(sync.Mutex))
|
||||
}
|
||||
return ri, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user