From 618fef0a4383678c2cb52d8eb151b291bca34e18 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 8 Sep 2021 16:52:35 +0800 Subject: [PATCH] Add unittest for mqclient (#7578) Signed-off-by: yhmo --- internal/util/mqclient/pulsar_client_test.go | 69 ++++++++-- .../util/mqclient/pulsar_consumer_test.go | 23 +++- internal/util/mqclient/pulsar_id_test.go | 64 ++++++++++ .../util/mqclient/pulsar_producer_test.go | 45 +++++++ internal/util/mqclient/rmq_client_test.go | 120 ++++++++++++++++-- internal/util/mqclient/rmq_id_test.go | 41 ++++++ 6 files changed, 345 insertions(+), 17 deletions(-) create mode 100644 internal/util/mqclient/pulsar_id_test.go create mode 100644 internal/util/mqclient/pulsar_producer_test.go create mode 100644 internal/util/mqclient/rmq_id_test.go diff --git a/internal/util/mqclient/pulsar_client_test.go b/internal/util/mqclient/pulsar_client_test.go index 01286f3019..c940de1c95 100644 --- a/internal/util/mqclient/pulsar_client_test.go +++ b/internal/util/mqclient/pulsar_client_test.go @@ -61,6 +61,17 @@ func Produce(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, log.Info("Produce done") } +func VerifyMessage(t *testing.T, msg ConsumerMessage) { + pload := BytesToInt(msg.Payload()) + log.Info("RECV", zap.Any("v", pload)) + pm := msg.(*pulsarMessage) + topic := pm.Topic() + assert.NotEmpty(t, topic) + log.Info("RECV", zap.Any("t", topic)) + prop := pm.Properties() + log.Info("RECV", zap.Any("p", len(prop))) +} + // Consume1 will consume random messages and record the last MessageID it received func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan MessageID, total *int) { consumer, err := pc.Subscribe(ConsumerOptions{ @@ -88,8 +99,7 @@ func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, return case msg = <-consumer.Chan(): consumer.Ack(msg) - v := BytesToInt(msg.Payload()) - log.Info("RECV", zap.Any("v", v)) + VerifyMessage(t, msg) (*total)++ //log.Debug("total", zap.Int("val", *total)) } @@ -129,8 +139,7 @@ func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, return case msg := <-consumer.Chan(): consumer.Ack(msg) - v := BytesToInt(msg.Payload()) - log.Info("RECV", zap.Any("v", v)) + VerifyMessage(t, msg) (*total)++ //log.Debug("total", zap.Int("val", *total)) } @@ -158,15 +167,14 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, return case msg := <-consumer.Chan(): consumer.Ack(msg) - v := BytesToInt(msg.Payload()) - log.Info("RECV", zap.Any("v", v)) + VerifyMessage(t, msg) (*total)++ //log.Debug("total", zap.Int("val", *total)) } } } -func TestPulsarClient(t *testing.T) { +func TestPulsarClient_Consume1(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() @@ -317,7 +325,7 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string } } -func TestPulsarClient2(t *testing.T) { +func TestPulsarClient_Consume2(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() @@ -366,3 +374,48 @@ func TestPulsarClient2(t *testing.T) { log.Info("main done") } + +func TestPulsarClient_EarliestMessageID(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + client, _ := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + defer client.Close() + + mid := client.EarliestMessageID() + assert.NotNil(t, mid) +} + +func TestPulsarClient_StringToMsgID(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + client, _ := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + defer client.Close() + + mid := client.EarliestMessageID() + str := PulsarMsgIDToString(mid) + + res, err := client.StringToMsgID(str) + assert.Nil(t, err) + assert.NotNil(t, res) + + str = "X" + res, err = client.StringToMsgID(str) + assert.Nil(t, res) + assert.NotNil(t, err) +} + +func TestPulsarClient_BytesToMsgID(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + client, _ := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + defer client.Close() + + mid := pulsar.EarliestMessageID() + binary := SerializePulsarMsgID(mid) + + res, err := client.BytesToMsgID(binary) + assert.Nil(t, err) + assert.NotNil(t, res) + + invalidBin := []byte{0} + res, err = client.BytesToMsgID(invalidBin) + assert.Nil(t, res) + assert.NotNil(t, err) +} diff --git a/internal/util/mqclient/pulsar_consumer_test.go b/internal/util/mqclient/pulsar_consumer_test.go index 8a9bb54cf1..2ec383503f 100644 --- a/internal/util/mqclient/pulsar_consumer_test.go +++ b/internal/util/mqclient/pulsar_consumer_test.go @@ -19,7 +19,28 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPatchEarliestMessageID(t *testing.T) { +func TestPulsarConsumer_Subscription(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + assert.Nil(t, err) + defer pc.Close() + + receiveChannel := make(chan pulsar.ConsumerMessage, 100) + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: "Topic", + SubscriptionName: "SubName", + Type: pulsar.SubscriptionType(Exclusive), + SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(SubscriptionPositionEarliest), + MessageChannel: receiveChannel, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + str := consumer.Subscription() + assert.NotNil(t, str) +} + +func Test_PatchEarliestMessageID(t *testing.T) { mid := pulsar.EarliestMessageID() // String() -> ledgerID:entryID:partitionIdx diff --git a/internal/util/mqclient/pulsar_id_test.go b/internal/util/mqclient/pulsar_id_test.go new file mode 100644 index 0000000000..527071b94f --- /dev/null +++ b/internal/util/mqclient/pulsar_id_test.go @@ -0,0 +1,64 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package mqclient + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/stretchr/testify/assert" +) + +func TestPulsarID_Serialize(t *testing.T) { + mid := pulsar.EarliestMessageID() + pid := &pulsarID{ + messageID: mid, + } + + binary := pid.Serialize() + assert.NotNil(t, binary) + assert.NotZero(t, len(binary)) +} + +func Test_SerializePulsarMsgID(t *testing.T) { + mid := pulsar.EarliestMessageID() + + binary := SerializePulsarMsgID(mid) + assert.NotNil(t, binary) + assert.NotZero(t, len(binary)) +} + +func Test_DeserializePulsarMsgID(t *testing.T) { + mid := pulsar.EarliestMessageID() + + binary := SerializePulsarMsgID(mid) + res, err := DeserializePulsarMsgID(binary) + assert.Nil(t, err) + assert.NotNil(t, res) +} + +func Test_PulsarMsgIDToString(t *testing.T) { + mid := pulsar.EarliestMessageID() + + str := PulsarMsgIDToString(mid) + assert.NotNil(t, str) + assert.NotZero(t, len(str)) +} + +func Test_StringToPulsarMsgID(t *testing.T) { + mid := pulsar.EarliestMessageID() + + str := PulsarMsgIDToString(mid) + res, err := StringToPulsarMsgID(str) + assert.Nil(t, err) + assert.NotNil(t, res) +} diff --git a/internal/util/mqclient/pulsar_producer_test.go b/internal/util/mqclient/pulsar_producer_test.go new file mode 100644 index 0000000000..40a66bd6b3 --- /dev/null +++ b/internal/util/mqclient/pulsar_producer_test.go @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package mqclient + +import ( + "context" + "testing" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/stretchr/testify/assert" +) + +func TestPulsarProducer(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + defer pc.Close() + assert.NoError(t, err) + assert.NotNil(t, pc) + + topic := "TEST" + producer, err := pc.CreateProducer(ProducerOptions{Topic: topic}) + assert.Nil(t, err) + assert.NotNil(t, producer) + + pulsarProd := producer.(*pulsarProducer) + assert.Equal(t, pulsarProd.Topic(), topic) + + msg := &ProducerMessage{ + Payload: []byte{}, + Properties: map[string]string{}, + } + err = producer.Send(context.TODO(), msg) + assert.Nil(t, err) + + pulsarProd.Close() +} diff --git a/internal/util/mqclient/rmq_client_test.go b/internal/util/mqclient/rmq_client_test.go index a8c357012a..cefd4d5099 100644 --- a/internal/util/mqclient/rmq_client_test.go +++ b/internal/util/mqclient/rmq_client_test.go @@ -12,9 +12,12 @@ package mqclient import ( + "context" "os" "testing" + "time" + "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" @@ -27,14 +30,16 @@ var Params paramtable.BaseTable func TestMain(m *testing.M) { Params.Init() - os.Setenv("ROCKSMQ_PATH", "/tmp/milvus/rdb_data") + path := "/tmp/milvus/rdb_data" + os.Setenv("ROCKSMQ_PATH", path) + defer os.RemoveAll(path) _ = rocksmq1.InitRocksMQ() exitCode := m.Run() defer rocksmq1.CloseRocksMQ() os.Exit(exitCode) } -func TestNewRmqClient(t *testing.T) { +func Test_NewRmqClient(t *testing.T) { opts := rocksmq.ClientOptions{} client, err := NewRmqClient(opts) defer client.Close() @@ -42,35 +47,134 @@ func TestNewRmqClient(t *testing.T) { assert.NotNil(t, client) } -func TestRmqCreateProducer(t *testing.T) { +func TestRmqClient_CreateProducer(t *testing.T) { opts := rocksmq.ClientOptions{} client, err := NewRmqClient(opts) defer client.Close() assert.Nil(t, err) assert.NotNil(t, client) - topic := "test_CreateProducer" + topic := "TestRmqClient_CreateProducer" proOpts := ProducerOptions{Topic: topic} producer, err := client.CreateProducer(proOpts) + defer producer.Close() assert.Nil(t, err) assert.NotNil(t, producer) + + rmqProducer := producer.(*rmqProducer) + defer rmqProducer.Close() + assert.Equal(t, rmqProducer.Topic(), topic) + + msg := &ProducerMessage{ + Payload: []byte{}, + Properties: nil, + } + err = rmqProducer.Send(context.TODO(), msg) + assert.Nil(t, err) + + invalidOpts := ProducerOptions{Topic: ""} + producer, e := client.CreateProducer(invalidOpts) + assert.Nil(t, producer) + assert.Error(t, e) } -func TestRmqSubscribe(t *testing.T) { +func TestRmqClient_Subscribe(t *testing.T) { opts := rocksmq.ClientOptions{} client, err := NewRmqClient(opts) defer client.Close() assert.Nil(t, err) assert.NotNil(t, client) - topic := "test_Subscribe" - subName := "subName_1" + topic := "TestRmqClient_Subscribe" + proOpts := ProducerOptions{Topic: topic} + producer, err := client.CreateProducer(proOpts) + defer producer.Close() + assert.Nil(t, err) + assert.NotNil(t, producer) + + subName := "subName" consumerOpts := ConsumerOptions{ - Topic: topic, + Topic: "", SubscriptionName: subName, BufSize: 1024, } + consumer, err := client.Subscribe(consumerOpts) + assert.NotNil(t, err) + assert.Nil(t, consumer) + + consumerOpts.Topic = topic + consumer, err = client.Subscribe(consumerOpts) + defer consumer.Close() assert.Nil(t, err) assert.NotNil(t, consumer) + assert.Equal(t, consumer.Subscription(), subName) + + msg := &ProducerMessage{ + Payload: []byte{1}, + Properties: nil, + } + err = producer.Send(context.TODO(), msg) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cancel() + for { + select { + case <-ctx.Done(): + return + case msg := <-consumer.Chan(): + consumer.Ack(msg) + rmqmsg := msg.(*rmqMessage) + msgPayload := rmqmsg.Payload() + assert.NotEmpty(t, msgPayload) + msgTopic := rmqmsg.Topic() + assert.Equal(t, msgTopic, topic) + msgProp := rmqmsg.Properties() + assert.Empty(t, msgProp) + msgID := rmqmsg.ID() + rID := msgID.(*rmqID) + assert.NotZero(t, rID) + err = consumer.Seek(msgID) + assert.Nil(t, err) + } + } +} + +func TestRmqClient_EarliestMessageID(t *testing.T) { + opts := rocksmq.ClientOptions{} + client, _ := NewRmqClient(opts) + defer client.Close() + + mid := client.EarliestMessageID() + assert.NotNil(t, mid) +} + +func TestRmqClient_StringToMsgID(t *testing.T) { + opts := rocksmq.ClientOptions{} + client, _ := NewRmqClient(opts) + defer client.Close() + + str := "5" + res, err := client.StringToMsgID(str) + assert.Nil(t, err) + assert.NotNil(t, res) + + str = "X" + res, err = client.StringToMsgID(str) + assert.Nil(t, res) + assert.NotNil(t, err) +} + +func TestRmqClient_BytesToMsgID(t *testing.T) { + opts := rocksmq.ClientOptions{} + client, _ := NewRmqClient(opts) + defer client.Close() + + mid := pulsar.EarliestMessageID() + binary := SerializePulsarMsgID(mid) + + res, err := client.BytesToMsgID(binary) + assert.Nil(t, err) + assert.NotNil(t, res) } diff --git a/internal/util/mqclient/rmq_id_test.go b/internal/util/mqclient/rmq_id_test.go new file mode 100644 index 0000000000..da2db44728 --- /dev/null +++ b/internal/util/mqclient/rmq_id_test.go @@ -0,0 +1,41 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package mqclient + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRmqID_Serialize(t *testing.T) { + rid := &rmqID{ + messageID: 8, + } + + bin := rid.Serialize() + assert.NotNil(t, bin) + assert.NotZero(t, len(bin)) +} + +func Test_SerializeRmqID(t *testing.T) { + bin := SerializeRmqID(10) + assert.NotNil(t, bin) + assert.NotZero(t, len(bin)) +} + +func Test_DeserializeRmqID(t *testing.T) { + bin := SerializeRmqID(5) + id, err := DeserializeRmqID(bin) + assert.Nil(t, err) + assert.Equal(t, id, int64(5)) +}