mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Improve rocksmq codecov (#8328)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
parent
865a430950
commit
9e17fdb598
@ -196,13 +196,11 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
||||
|
||||
err := rmq.kv.Save(beginKey, "0")
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: save " + beginKey + " failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
err = rmq.kv.Save(endKey, "0")
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: save " + endKey + " failed.")
|
||||
return err
|
||||
}
|
||||
if _, ok := topicMu.Load(topicName); !ok {
|
||||
@ -323,7 +321,6 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
||||
}
|
||||
err := rmq.kv.Save(key, DefaultMessageID)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: save " + key + " failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -362,7 +359,6 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
||||
|
||||
err := rmq.kv.Remove(key)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: remove " + key + " failed.")
|
||||
return err
|
||||
}
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
|
||||
@ -119,6 +119,9 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
|
||||
}
|
||||
rmq.RegisterConsumer(consumer2)
|
||||
|
||||
topicMu.Delete(topicName)
|
||||
topicMu.Store(topicName, topicName)
|
||||
assert.Error(t, rmq.DestroyConsumerGroup(topicName, groupName))
|
||||
err = rmq.DestroyConsumerGroup(topicName, groupName)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
@ -230,6 +233,16 @@ func TestRocksmq_Dummy(t *testing.T) {
|
||||
pMsgs := make([]ProducerMessage, 1)
|
||||
pMsgA := ProducerMessage{Payload: []byte(msgA)}
|
||||
pMsgs[0] = pMsgA
|
||||
|
||||
topicMu.Delete(channelName)
|
||||
_, err = rmq.Consume(channelName, groupName1, 1)
|
||||
assert.Error(t, err)
|
||||
topicMu.Store(channelName, channelName)
|
||||
assert.Error(t, rmq.Produce(channelName, nil))
|
||||
|
||||
_, err = rmq.Consume(channelName, groupName1, 1)
|
||||
assert.Error(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestRocksmq_Loop(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user