Fix pulsar error check (#18207)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2022-07-11 10:56:25 +08:00 committed by GitHub
parent e61f61a9bd
commit 653dce4948
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 61 deletions

View File

@ -18,6 +18,7 @@ package pulsar
import (
"errors"
"strings"
"sync"
"github.com/apache/pulsar-client-go/pulsar"
@ -34,24 +35,6 @@ type pulsarClient struct {
var sc *pulsarClient
var once sync.Once
func isPulsarError(err error, result ...pulsar.Result) bool {
if len(result) == 0 {
return false
}
perr, ok := err.(*pulsar.Error)
if !ok {
return false
}
for _, r := range result {
if perr.Result() == r {
return true
}
}
return false
}
// NewClient creates a pulsarClient object
// according to the parameter opts of type pulsar.ClientOptions
func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) {
@ -97,8 +80,7 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.
MessageChannel: receiveChannel,
})
if err != nil {
// exclusive consumer already exist
if isPulsarError(err, pulsar.ConsumerBusy) {
if strings.Contains(err.Error(), "ConsumerBusy") {
return nil, retry.Unrecoverable(err)
}
return nil, err

View File

@ -20,7 +20,6 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net/url"
@ -585,44 +584,18 @@ func hackPulsarError(result pulsar.Result) *pulsar.Error {
// use unsafe to generate test case
/* #nosec G103 */
mpe := (*mPulsarError)(unsafe.Pointer(pe))
// this what we tested
if result == pulsar.ConsumerBusy {
mpe.msg = "server error: ConsumerBusy: Exclusive consumer is already connected"
}
if result == pulsar.ConsumerNotFound {
mpe.msg = "server error: MetadataError: Consumer not found"
}
mpe.result = result
return pe
}
func TestIsPulsarError(t *testing.T) {
type testCase struct {
err error
results []pulsar.Result
expected bool
}
cases := []testCase{
{
err: errors.New(""),
results: []pulsar.Result{},
expected: false,
},
{
err: errors.New(""),
results: []pulsar.Result{pulsar.ConnectError},
expected: false,
},
{
err: hackPulsarError(pulsar.ConsumerBusy),
results: []pulsar.Result{pulsar.ConnectError},
expected: false,
},
{
err: hackPulsarError(pulsar.ConsumerBusy),
results: []pulsar.Result{pulsar.ConnectError, pulsar.ConsumerBusy},
expected: true,
},
}
for _, tc := range cases {
assert.Equal(t, tc.expected, isPulsarError(tc.err, tc.results...))
}
}
type mockPulsarClient struct{}
// CreateProducer Creates the producer instance

View File

@ -18,6 +18,7 @@ package pulsar
import (
"context"
"strings"
"sync"
"time"
"unsafe"
@ -115,13 +116,14 @@ func (pc *Consumer) Close() {
// Unsubscribe for the consumer
fn := func() error {
err := pc.c.Unsubscribe()
if isPulsarError(err, pulsar.SubscriptionNotFound) || isPulsarError(err, pulsar.ConsumerNotFound) {
log.Warn("failed to find consumer, skip unsubscribe",
zap.String("subscription", pc.Subscription()),
zap.Error(err))
return nil
}
if err != nil {
// this is the hack due to pulsar didn't handle error as expected
if strings.Contains(err.Error(), "Consumer not found") {
log.Warn("failed to find consumer, skip unsubscribe",
zap.String("subscription", pc.Subscription()),
zap.Error(err))
return nil
}
return err
}
// only close if unsubscribe successfully

View File

@ -19,9 +19,13 @@ package pulsar
import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
@ -131,3 +135,80 @@ func TestPulsarConsumer_Close(t *testing.T) {
// test double close
pulsarConsumer.Close()
}
func TestPulsarClientCloseUnsubscribeError(t *testing.T) {
topic := "TestPulsarClientCloseUnsubscribeError"
subName := "test"
pulsarAddress, _ := Params.Load("_PulsarAddress")
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
assert.NoError(t, err)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
defer consumer.Close()
assert.NoError(t, err)
// subscribe agiain
_, err = client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
defer consumer.Close()
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "ConsumerBusy"))
topicName, err := utils.GetTopicName(topic)
assert.NoError(t, err)
pulsarURL, err := url.ParseRequestURI(pulsarAddress)
if err != nil {
panic(err)
}
webport := Params.LoadWithDefault("pulsar.webport", "80")
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport
admin := cmdutils.NewPulsarClient()
err = admin.Subscriptions().Delete(*topicName, subName, true)
if err != nil {
cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080"
admin := cmdutils.NewPulsarClient()
err = admin.Subscriptions().Delete(*topicName, subName, true)
assert.NoError(t, err)
}
err = consumer.Unsubscribe()
assert.True(t, strings.Contains(err.Error(), "Consumer not found"))
fmt.Println(err)
}
func TestPulsarClientUnsubscribeTwice(t *testing.T) {
topic := "TestPulsarClientUnsubscribeTwice"
subName := "test"
pulsarAddress, _ := Params.Load("_PulsarAddress")
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress})
defer client.Close()
assert.NoError(t, err)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Type: pulsar.Exclusive,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
defer consumer.Close()
assert.NoError(t, err)
err = consumer.Unsubscribe()
assert.NoError(t, err)
err = consumer.Unsubscribe()
assert.True(t, strings.Contains(err.Error(), "Consumer not found"))
fmt.Println(err)
}