From d90308e9a01ac1af5ecffa17dabdeba43cb70f09 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 20 Jul 2022 15:42:29 +0800 Subject: [PATCH] Handle topic not exist in rqm & mqtt seek (#18314) Signed-off-by: Congqi Xia --- internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go | 3 ++- internal/mq/msgstream/mq_msgstream.go | 4 ++++ internal/mq/msgstream/mqwrapper/errors.go | 6 ++++++ 3 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 internal/mq/msgstream/mqwrapper/errors.go diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 3994ba479a..366bbd3e2a 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" @@ -783,7 +784,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { - return fmt.Errorf("topic name = %s not exist", topicName) + return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) } lock, ok := ll.(*sync.Mutex) if !ok { diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 9a1242e5c3..845eae58d9 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -884,6 +884,10 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { err = consumer.Seek(seekMsgID, true) if err != nil { log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) + // stop retry if consumer topic not exist + if errors.Is(err, mqwrapper.ErrTopicNotExist) { + return retry.Unrecoverable(err) + } return err } log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) diff --git a/internal/mq/msgstream/mqwrapper/errors.go b/internal/mq/msgstream/mqwrapper/errors.go new file mode 100644 index 0000000000..04810bf517 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/errors.go @@ -0,0 +1,6 @@ +package mqwrapper + +import "errors" + +// ErrTopicNotExist topic not exist error. +var ErrTopicNotExist = errors.New("topic not exist")