From 921501dfac4636ef41d2c80bb3d6cde2b311affe Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 5 Jan 2022 16:19:20 +0800 Subject: [PATCH] Fail fast when pulsar subscribe exlusive timeout (#14844) Signed-off-by: Congqi Xia --- internal/util/mqclient/pulsar_client.go | 23 +++++ internal/util/mqclient/pulsar_client_test.go | 101 +++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index 9d2b4810ae..48920d0779 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -22,6 +22,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/retry" "go.uber.org/zap" ) @@ -32,6 +33,24 @@ type pulsarClient struct { var sc *pulsarClient var once sync.Once +func isPulsarError(err error, result ...pulsar.Result) bool { + if len(result) == 0 { + return false + } + + perr, ok := err.(*pulsar.Error) + if !ok { + return false + } + for _, r := range result { + if perr.Result() == r { + return true + } + } + + return false +} + // GetPulsarClientInstance creates a pulsarClient object // according to the parameter opts of type pulsar.ClientOptions func GetPulsarClientInstance(opts pulsar.ClientOptions) (*pulsarClient, error) { @@ -91,6 +110,10 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { MessageChannel: receiveChannel, }) if err != nil { + // exclusive consumer already exist + if isPulsarError(err, pulsar.ConsumerBusy) { + return nil, retry.Unrecoverable(err) + } return nil, err } diff --git a/internal/util/mqclient/pulsar_client_test.go b/internal/util/mqclient/pulsar_client_test.go index bcffd0a350..3b9db2e82e 100644 --- a/internal/util/mqclient/pulsar_client_test.go +++ b/internal/util/mqclient/pulsar_client_test.go @@ -20,16 +20,19 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "math/rand" "testing" "time" + "unsafe" "go.uber.org/zap" "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/stretchr/testify/assert" ) @@ -561,3 +564,101 @@ func TestPulsarClient_BytesToMsgID(t *testing.T) { assert.Nil(t, res) assert.NotNil(t, err) } + +type mPulsarError struct { + msg string + result pulsar.Result +} + +func hackPulsarError(result pulsar.Result) *pulsar.Error { + pe := &pulsar.Error{} + // use unsafe to generate test case + /* #nosec G103 */ + mpe := (*mPulsarError)(unsafe.Pointer(pe)) + mpe.result = result + return pe +} + +func TestIsPulsarError(t *testing.T) { + type testCase struct { + err error + results []pulsar.Result + expected bool + } + cases := []testCase{ + { + err: errors.New(""), + results: []pulsar.Result{}, + expected: false, + }, + { + err: errors.New(""), + results: []pulsar.Result{pulsar.ConnectError}, + expected: false, + }, + { + err: hackPulsarError(pulsar.ConsumerBusy), + results: []pulsar.Result{pulsar.ConnectError}, + expected: false, + }, + { + err: hackPulsarError(pulsar.ConsumerBusy), + results: []pulsar.Result{pulsar.ConnectError, pulsar.ConsumerBusy}, + expected: true, + }, + } + + for _, tc := range cases { + assert.Equal(t, tc.expected, isPulsarError(tc.err, tc.results...)) + } +} + +type mockPulsarClient struct{} + +// CreateProducer Creates the producer instance +// This method will block until the producer is created successfully +func (c *mockPulsarClient) CreateProducer(_ pulsar.ProducerOptions) (pulsar.Producer, error) { + return nil, hackPulsarError(pulsar.ConnectError) +} + +// Subscribe Creates a `Consumer` by subscribing to a topic. +// +// If the subscription does not exist, a new subscription will be created and all messages published after the +// creation will be retained until acknowledged, even if the consumer is not connected +func (c *mockPulsarClient) Subscribe(_ pulsar.ConsumerOptions) (pulsar.Consumer, error) { + return nil, hackPulsarError(pulsar.ConsumerBusy) +} + +// CreateReader Creates a Reader instance. +// This method will block until the reader is created successfully. +func (c *mockPulsarClient) CreateReader(_ pulsar.ReaderOptions) (pulsar.Reader, error) { + return nil, hackPulsarError(pulsar.ConnectError) +} + +// TopicPartitions Fetches the list of partitions for a given topic +// +// If the topic is partitioned, this will return a list of partition names. +// If the topic is not partitioned, the returned list will contain the topic +// name itself. +// +// This can be used to discover the partitions and create {@link Reader}, +// {@link Consumer} or {@link Producer} instances directly on a particular partition. +func (c *mockPulsarClient) TopicPartitions(topic string) ([]string, error) { + return nil, hackPulsarError(pulsar.ConnectError) +} + +// Close Closes the Client and free associated resources +func (c *mockPulsarClient) Close() { +} + +func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) { + t.Run("exclusive pulsar consumer failure", func(t *testing.T) { + pc := &pulsarClient{ + client: &mockPulsarClient{}, + } + + _, err := pc.Subscribe(ConsumerOptions{}) + assert.Error(t, err) + assert.True(t, retry.IsUncoverable(err)) + }) +}