From 1c9d43ee9f4bd88d3f5768c58ea33c40fa455d53 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 10 Mar 2025 10:54:04 +0800 Subject: [PATCH] fix: rockmq race condition (#40482) fix #40481 Signed-off-by: xiaofanluan --- pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 9d7639c573..dfa611c12b 100644 --- a/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -612,9 +612,16 @@ func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) er consumers := vals.([]*Consumer) for index, v := range consumers { if v.GroupName == groupName { + // Fix data race: close the channel before modifying the slice close(v.MsgMutex) - consumers = append(consumers[:index], consumers[index+1:]...) - rmq.consumers.Store(topicName, consumers) + + // Create a new slice to avoid data race + newConsumers := make([]*Consumer, 0, len(consumers)-1) + newConsumers = append(newConsumers, consumers[:index]...) + if index < len(consumers)-1 { + newConsumers = append(newConsumers, consumers[index+1:]...) + } + rmq.consumers.Store(topicName, newConsumers) break } }