diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index bc78e9526e..34688ff545 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -381,7 +381,7 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { start := time.Now() key := constructCurrentID(topicName, groupName) if rmq.checkKeyExist(key) { - log.Debug("RocksMQ: " + key + " existed.") + log.Debug("RMQ CreateConsumerGroup key already exists", zap.String("key", key)) return nil } err := rmq.kv.Save(key, DefaultMessageID) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 74c373f9c4..45ff34a1c2 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -179,6 +179,9 @@ func TestRocksmq(t *testing.T) { _ = rmq.DestroyConsumerGroup(channelName, groupName) err = rmq.CreateConsumerGroup(channelName, groupName) assert.Nil(t, err) + // double create consumer group + err = rmq.CreateConsumerGroup(channelName, groupName) + assert.Nil(t, err) cMsgs, err := rmq.Consume(channelName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(cMsgs), 1)