From 653dce4948d8f9765271b0cfa4d8f4b74d0eb436 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 11 Jul 2022 10:56:25 +0800 Subject: [PATCH] Fix pulsar error check (#18207) Signed-off-by: xiaofan-luan --- .../mqwrapper/pulsar/pulsar_client.go | 22 +---- .../mqwrapper/pulsar/pulsar_client_test.go | 43 ++-------- .../mqwrapper/pulsar/pulsar_consumer.go | 14 ++-- .../mqwrapper/pulsar/pulsar_consumer_test.go | 81 +++++++++++++++++++ 4 files changed, 99 insertions(+), 61 deletions(-) diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index 90fb1fd96c..39a6186812 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -18,6 +18,7 @@ package pulsar import ( "errors" + "strings" "sync" "github.com/apache/pulsar-client-go/pulsar" @@ -34,24 +35,6 @@ type pulsarClient struct { var sc *pulsarClient var once sync.Once -func isPulsarError(err error, result ...pulsar.Result) bool { - if len(result) == 0 { - return false - } - - perr, ok := err.(*pulsar.Error) - if !ok { - return false - } - for _, r := range result { - if perr.Result() == r { - return true - } - } - - return false -} - // NewClient creates a pulsarClient object // according to the parameter opts of type pulsar.ClientOptions func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) { @@ -97,8 +80,7 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper. MessageChannel: receiveChannel, }) if err != nil { - // exclusive consumer already exist - if isPulsarError(err, pulsar.ConsumerBusy) { + if strings.Contains(err.Error(), "ConsumerBusy") { return nil, retry.Unrecoverable(err) } return nil, err diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index 7b00e512bd..d21c3ccd80 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "encoding/binary" - "errors" "fmt" "math/rand" "net/url" @@ -585,44 +584,18 @@ func hackPulsarError(result pulsar.Result) *pulsar.Error { // use unsafe to generate test case /* #nosec G103 */ mpe := (*mPulsarError)(unsafe.Pointer(pe)) + // this what we tested + if result == pulsar.ConsumerBusy { + mpe.msg = "server error: ConsumerBusy: Exclusive consumer is already connected" + } + + if result == pulsar.ConsumerNotFound { + mpe.msg = "server error: MetadataError: Consumer not found" + } mpe.result = result return pe } -func TestIsPulsarError(t *testing.T) { - type testCase struct { - err error - results []pulsar.Result - expected bool - } - cases := []testCase{ - { - err: errors.New(""), - results: []pulsar.Result{}, - expected: false, - }, - { - err: errors.New(""), - results: []pulsar.Result{pulsar.ConnectError}, - expected: false, - }, - { - err: hackPulsarError(pulsar.ConsumerBusy), - results: []pulsar.Result{pulsar.ConnectError}, - expected: false, - }, - { - err: hackPulsarError(pulsar.ConsumerBusy), - results: []pulsar.Result{pulsar.ConnectError, pulsar.ConsumerBusy}, - expected: true, - }, - } - - for _, tc := range cases { - assert.Equal(t, tc.expected, isPulsarError(tc.err, tc.results...)) - } -} - type mockPulsarClient struct{} // CreateProducer Creates the producer instance diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go index bd2687ee76..20642b2a9a 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer.go @@ -18,6 +18,7 @@ package pulsar import ( "context" + "strings" "sync" "time" "unsafe" @@ -115,13 +116,14 @@ func (pc *Consumer) Close() { // Unsubscribe for the consumer fn := func() error { err := pc.c.Unsubscribe() - if isPulsarError(err, pulsar.SubscriptionNotFound) || isPulsarError(err, pulsar.ConsumerNotFound) { - log.Warn("failed to find consumer, skip unsubscribe", - zap.String("subscription", pc.Subscription()), - zap.Error(err)) - return nil - } if err != nil { + // this is the hack due to pulsar didn't handle error as expected + if strings.Contains(err.Error(), "Consumer not found") { + log.Warn("failed to find consumer, skip unsubscribe", + zap.String("subscription", pc.Subscription()), + zap.Error(err)) + return nil + } return err } // only close if unsubscribe successfully diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index 0ec8e6de82..61ac7b0efd 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -19,9 +19,13 @@ package pulsar import ( "context" "fmt" + "net/url" + "strings" "testing" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" "github.com/apache/pulsar-client-go/pulsar" "github.com/stretchr/testify/assert" @@ -131,3 +135,80 @@ func TestPulsarConsumer_Close(t *testing.T) { // test double close pulsarConsumer.Close() } + +func TestPulsarClientCloseUnsubscribeError(t *testing.T) { + topic := "TestPulsarClientCloseUnsubscribeError" + subName := "test" + pulsarAddress, _ := Params.Load("_PulsarAddress") + + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + defer client.Close() + assert.NoError(t, err) + + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + defer consumer.Close() + assert.NoError(t, err) + + // subscribe agiain + _, err = client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + defer consumer.Close() + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "ConsumerBusy")) + + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + + pulsarURL, err := url.ParseRequestURI(pulsarAddress) + if err != nil { + panic(err) + } + webport := Params.LoadWithDefault("pulsar.webport", "80") + cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport + admin := cmdutils.NewPulsarClient() + err = admin.Subscriptions().Delete(*topicName, subName, true) + if err != nil { + cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" + admin := cmdutils.NewPulsarClient() + err = admin.Subscriptions().Delete(*topicName, subName, true) + assert.NoError(t, err) + } + + err = consumer.Unsubscribe() + assert.True(t, strings.Contains(err.Error(), "Consumer not found")) + fmt.Println(err) +} + +func TestPulsarClientUnsubscribeTwice(t *testing.T) { + topic := "TestPulsarClientUnsubscribeTwice" + subName := "test" + pulsarAddress, _ := Params.Load("_PulsarAddress") + + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + defer client.Close() + assert.NoError(t, err) + + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + defer consumer.Close() + assert.NoError(t, err) + + err = consumer.Unsubscribe() + assert.NoError(t, err) + err = consumer.Unsubscribe() + assert.True(t, strings.Contains(err.Error(), "Consumer not found")) + fmt.Println(err) +}