diff --git a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 9d7639c573..dfa611c12b 100644 --- a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -612,9 +612,16 @@ func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) er consumers := vals.([]*Consumer) for index, v := range consumers { if v.GroupName == groupName { + // Fix data race: close the channel before modifying the slice close(v.MsgMutex) - consumers = append(consumers[:index], consumers[index+1:]...) - rmq.consumers.Store(topicName, consumers) + + // Create a new slice to avoid data race + newConsumers := make([]*Consumer, 0, len(consumers)-1) + newConsumers = append(newConsumers, consumers[:index]...) + if index < len(consumers)-1 { + newConsumers = append(newConsumers, consumers[index+1:]...) + } + rmq.consumers.Store(topicName, newConsumers) break } }