From 6f3a460b4e517935c9a75ede41b5a01a57fdce3d Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Mon, 10 May 2021 10:32:10 +0800 Subject: [PATCH] Add pulsar seek example in pulsar_client_test.go (#5154) Signed-off-by: yudong.cai --- internal/util/mqclient/pulsar_client_test.go | 154 ++++++++++++++++--- 1 file changed, 130 insertions(+), 24 deletions(-) diff --git a/internal/util/mqclient/pulsar_client_test.go b/internal/util/mqclient/pulsar_client_test.go index d26bc71204..f8d7cdbea9 100644 --- a/internal/util/mqclient/pulsar_client_test.go +++ b/internal/util/mqclient/pulsar_client_test.go @@ -12,42 +12,54 @@ package mqclient import ( + "bytes" + "context" + "encoding/binary" + "math/rand" "testing" + "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/prometheus/common/log" "github.com/stretchr/testify/assert" ) -func TestNewPulsarClient(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") - pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) +func IntToBytes(n int) []byte { + tmp := int32(n) + bytesBuffer := bytes.NewBuffer([]byte{}) + binary.Write(bytesBuffer, binary.BigEndian, tmp) + return bytesBuffer.Bytes() } -func TestPulsarCreateProducer(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") - pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) +func BytesToInt(b []byte) int { + bytesBuffer := bytes.NewBuffer(b) + var tmp int32 + binary.Read(bytesBuffer, binary.BigEndian, &tmp) + return int(tmp) +} - topic := "test_CreateProducer" +func Produce(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, arr []int) { producer, err := pc.CreateProducer(ProducerOptions{Topic: topic}) - assert.NoError(t, err) + assert.Nil(t, err) assert.NotNil(t, producer) + + log.Infof("Produce start") + + for _, v := range arr { + msg := &ProducerMessage{ + Payload: IntToBytes(v), + Properties: map[string]string{}, + } + err = producer.Send(ctx, msg) + assert.Nil(t, err) + log.Infof("SND %d", v) + } + + log.Infof("Produce done") } -func TestPulsarSubscribe(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") - pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) - - topic := "test_Subscribe" - subName := "subName" +// Consume1 will consume random messages and record the last MessageID it received +func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan MessageID) { consumer, err := pc.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subName, @@ -55,6 +67,100 @@ func TestPulsarSubscribe(t *testing.T) { Type: KeyShared, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) - assert.NoError(t, err) + assert.Nil(t, err) assert.NotNil(t, consumer) + defer consumer.Close() + + log.Infof("Consume1 start") + + // get random number between 1 ~ 5 + rand.Seed(time.Now().UnixNano()) + cnt := 1 + rand.Int()%5 + + var msg ConsumerMessage + for i := 0; i < cnt; i++ { + select { + case <-ctx.Done(): + log.Infof("Consume1 channel closed") + return + case msg = <-consumer.Chan(): + //consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Infof("RECV v = %d", v) + } + } + c <- msg.ID() + + log.Infof("Consume1 randomly RECV %d messages", cnt) + log.Infof("Consume1 done") +} + +// Consume2 will consume messages from specified MessageID +func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, msgID MessageID) { + consumer, err := pc.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + Type: KeyShared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + err = consumer.Seek(msgID) + assert.Nil(t, err) + + // skip the last received message + <-consumer.Chan() + + log.Infof("Consume2 start") + + for { + select { + case <-ctx.Done(): + log.Infof("Consume2 channel closed") + return + case msg := <-consumer.Chan(): + //consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Infof("RECV v = %d", v) + } + } +} + +func TestPulsarClient(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + defer pc.Close() + assert.NoError(t, err) + assert.NotNil(t, pc) + + topic := "test" + subName := "subName" + arr := []int{111, 222, 333, 444, 555, 666, 777} + c := make(chan MessageID) + + ctx, cancel := context.WithCancel(context.Background()) + + // launch consume1 + go Consume1(ctx, t, pc, topic, subName, c) + time.Sleep(1 * time.Second) + + // launch produce + go Produce(ctx, t, pc, topic, arr) + time.Sleep(1 * time.Second) + + // record the last received message id + lastMsgID := <-c + log.Info(lastMsgID) + + // launch consume2 + go Consume2(ctx, t, pc, topic, subName, lastMsgID) + time.Sleep(1 * time.Second) + + // stop Consume2 + cancel() + + log.Infof("main done") }