diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index cb8977c200..8107dcec47 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -103,6 +103,9 @@ func (ms *RmqMsgStream) AsProducer(channels []string) { err := rocksmq.Rmq.CreateChannel(channel) if err == nil { ms.producers = append(ms.producers, channel) + } else { + errMsg := "Failed to create producer " + channel + ", error = " + err.Error() + panic(errMsg) } } } diff --git a/internal/util/rocksmq/rocksmq.go b/internal/util/rocksmq/rocksmq.go index 16d3e8493f..e3eb1525f0 100644 --- a/internal/util/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/rocksmq.go @@ -128,7 +128,7 @@ func (rmq *RocksMQ) CreateChannel(channelName string) error { // Check if channel exist if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) { - return errors.New("Channel " + channelName + " already exists.") + return nil } err := rmq.kv.Save(beginKey, "0")