Handle topic not exist in rqm & mqtt seek (#18314)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2022-07-20 15:42:29 +08:00 committed by GitHub
parent 0ebe407221
commit d90308e9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 1 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
@ -783,7 +784,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
/* Step I: Check if key exists */
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
}
lock, ok := ll.(*sync.Mutex)
if !ok {

View File

@ -884,6 +884,10 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
err = consumer.Seek(seekMsgID, true)
if err != nil {
log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err))
// stop retry if consumer topic not exist
if errors.Is(err, mqwrapper.ErrTopicNotExist) {
return retry.Unrecoverable(err)
}
return err
}
log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName))

View File

@ -0,0 +1,6 @@
package mqwrapper
import "errors"
// ErrTopicNotExist topic not exist error.
var ErrTopicNotExist = errors.New("topic not exist")