From af3736aff6ff9171fdb46084c581f90c0647a77a Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 18 Feb 2021 14:13:55 +0800 Subject: [PATCH] Fix rocksmq bug Signed-off-by: groot --- internal/msgstream/rmqms/rmq_msgstream.go | 3 +++ internal/util/rocksmq/rocksmq.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index cb8977c200..8107dcec47 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -103,6 +103,9 @@ func (ms *RmqMsgStream) AsProducer(channels []string) { err := rocksmq.Rmq.CreateChannel(channel) if err == nil { ms.producers = append(ms.producers, channel) + } else { + errMsg := "Failed to create producer " + channel + ", error = " + err.Error() + panic(errMsg) } } } diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 16d3e8493f..e3eb1525f0 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -128,7 +128,7 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error { // Check if channel exist if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) { - return errors.New("Channel " + channelName + " already exists.") + return nil } err := rmq.kv.Save(beginKey, "0")