From fbc352263c795fd9925ea1db4a4d9a6a340ef641 Mon Sep 17 00:00:00 2001 From: yukun Date: Wed, 8 Sep 2021 11:03:59 +0800 Subject: [PATCH] Improve rocksmq client code coverage (#7540) Signed-off-by: fishpenguin --- .../util/rocksmq/client/rocksmq/client.go | 4 +- .../client/rocksmq/client_impl_test.go | 94 ++++++++++++++++++- .../client/rocksmq/consumer_impl_test.go | 87 ++++++++++++++++- .../rocksmq/client/rocksmq/test_helper.go | 33 +++++++ 4 files changed, 212 insertions(+), 6 deletions(-) diff --git a/internal/util/rocksmq/client/rocksmq/client.go b/internal/util/rocksmq/client/rocksmq/client.go index a1e48411eb..d1b1651481 100644 --- a/internal/util/rocksmq/client/rocksmq/client.go +++ b/internal/util/rocksmq/client/rocksmq/client.go @@ -18,7 +18,9 @@ import ( type RocksMQ = server.RocksMQ func NewClient(options ClientOptions) (Client, error) { - options.Server = server.Rmq + if options.Server == nil { + options.Server = server.Rmq + } return newClient(options) } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl_test.go b/internal/util/rocksmq/client/rocksmq/client_impl_test.go index 074d00ef89..edf67e57fd 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl_test.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl_test.go @@ -23,11 +23,18 @@ func TestClient(t *testing.T) { assert.Nil(t, err) } -func TestCreateProducer(t *testing.T) { +func TestClient_CreateProducer(t *testing.T) { + var client0 client + producer0, err := client0.CreateProducer(ProducerOptions{}) + assert.Nil(t, producer0) + assert.Error(t, err) + + ///////////////////////////////////////////////// client, err := NewClient(ClientOptions{ Server: newMockRocksMQ(), }) assert.NoError(t, err) + defer client.Close() producer, err := client.CreateProducer(ProducerOptions{ Topic: newTopicName(), @@ -35,10 +42,32 @@ func TestCreateProducer(t *testing.T) { assert.Error(t, err) assert.Nil(t, producer) - client.Close() + ///////////////////////////////////////////////// + rmqPath := "/tmp/milvus/test_client1" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client1, err := NewClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + defer client1.Close() + producer1, err := client1.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + assert.NotNil(t, producer1) + assert.NoError(t, err) + defer producer1.Close() + + // ///////////////////////////////////////////////// + // dummyTopic := strings.Repeat(newTopicName(), 100) + // producer2, err := client1.CreateProducer(ProducerOptions{ + // Topic: dummyTopic, + // }) + // assert.Nil(t, producer2) + // assert.Error(t, err) } -func TestSubscribe(t *testing.T) { +func TestClient_Subscribe(t *testing.T) { client, err := NewClient(ClientOptions{ Server: newMockRocksMQ(), }) @@ -52,4 +81,63 @@ func TestSubscribe(t *testing.T) { assert.Nil(t, consumer) client.Close() + + ///////////////////////////////////////////////// + rmqPath := "/tmp/milvus/test_client2" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client1, err := NewClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + defer client1.Close() + opt := ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: newConsumerName(), + } + consumer1, err := client1.Subscribe(opt) + assert.NoError(t, err) + assert.NotNil(t, consumer1) + consumer2, err := client1.Subscribe(opt) + assert.NoError(t, err) + assert.NotNil(t, consumer2) + + producer1, err := client1.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + assert.NotNil(t, producer1) + assert.NoError(t, err) +} + +func TestClient_consume(t *testing.T) { + rmqPath := "/tmp/milvus/test_client3" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client, err := NewClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + defer client.Close() + topicName := newTopicName() + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.NotNil(t, producer) + assert.NoError(t, err) + + opt := ConsumerOptions{ + Topic: topicName, + SubscriptionName: newConsumerName(), + } + consumer, err := client.Subscribe(opt) + assert.NoError(t, err) + assert.NotNil(t, consumer) + + msg := &ProducerMessage{ + Payload: make([]byte, 10), + } + producer.Send(msg) + + <-consumer.Chan() + } diff --git a/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go b/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go index 94d4741a5f..e275891936 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go +++ b/internal/util/rocksmq/client/rocksmq/consumer_impl_test.go @@ -17,7 +17,9 @@ import ( "github.com/stretchr/testify/assert" ) -func TestConsumer(t *testing.T) { +func TestConsumer_newConsumer(t *testing.T) { + assert.Equal(t, EarliestMessageID(), int64(-1)) + consumer, err := newConsumer(nil, ConsumerOptions{ Topic: newTopicName(), SubscriptionName: newConsumerName(), @@ -32,15 +34,73 @@ func TestConsumer(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, InvalidConfiguration, err.(*Error).Result()) + consumer, err = newConsumer1(newMockClient(), ConsumerOptions{}, nil) + assert.Nil(t, consumer) + assert.NotNil(t, err) + assert.Equal(t, InvalidConfiguration, err.(*Error).Result()) + consumer, err = newConsumer(newMockClient(), ConsumerOptions{ Topic: newTopicName(), }) assert.Nil(t, consumer) assert.NotNil(t, err) assert.Equal(t, InvalidConfiguration, err.(*Error).Result()) + + ///////////////////////////////////////////////// + rmqPath := "/tmp/milvus/test_consumer1" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client, err := newClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + consumerName := newConsumerName() + consumer1, err := newConsumer(client, ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: consumerName, + }) + assert.NoError(t, err) + assert.NotNil(t, consumer1) + defer consumer1.Close() + assert.Equal(t, consumerName, consumer1.Subscription()) + + consumer2, err := newConsumer(client, ConsumerOptions{ + Topic: "", + }) + assert.Error(t, err) + assert.Nil(t, consumer2) + + consumer3, err := newConsumer(client, ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: "", + }) + assert.Error(t, err) + assert.Nil(t, consumer3) + + consumer4, err := newConsumer1(client, ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: newConsumerName(), + }, nil) + assert.NoError(t, err) + assert.NotNil(t, consumer4) + + consumer5, err := newConsumer1(client, ConsumerOptions{ + Topic: "", + }, nil) + assert.Error(t, err) + assert.Nil(t, consumer5) + + consumer6, err := newConsumer1(client, ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: "", + }, nil) + assert.Error(t, err) + assert.Nil(t, consumer6) } -func TestSubscription(t *testing.T) { +func TestConsumer_Subscription(t *testing.T) { topicName := newTopicName() consumerName := newConsumerName() consumer, err := newConsumer(newMockClient(), ConsumerOptions{ @@ -51,3 +111,26 @@ func TestSubscription(t *testing.T) { assert.NotNil(t, err) //assert.Equal(t, consumerName, consumer.Subscription()) } + +func TestConsumer_Seek(t *testing.T) { + rmqPath := "/tmp/milvus/test_consumer2" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client, err := newClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + + topicName := newTopicName() + consumerName := newConsumerName() + consumer, err := newConsumer(client, ConsumerOptions{ + Topic: topicName, + SubscriptionName: consumerName, + }) + assert.NoError(t, err) + assert.NotNil(t, consumer) + + consumer.Seek(0) +} diff --git a/internal/util/rocksmq/client/rocksmq/test_helper.go b/internal/util/rocksmq/client/rocksmq/test_helper.go index c2314d10d5..316d39d9f4 100644 --- a/internal/util/rocksmq/client/rocksmq/test_helper.go +++ b/internal/util/rocksmq/client/rocksmq/test_helper.go @@ -13,8 +13,12 @@ package rocksmq import ( "fmt" + "os" "time" + "github.com/milvus-io/milvus/internal/allocator" + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + rocksmq "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" ) @@ -37,3 +41,32 @@ func newMockClient() *client { }) return client } + +func initIDAllocator(kvPath string) *allocator.GlobalIDAllocator { + rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath) + if err != nil { + panic(err) + } + idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) + _ = idAllocator.Initialize() + return idAllocator +} + +func newRocksMQ(rmqPath string) server.RocksMQ { + kvPath := rmqPath + "_kv" + idAllocator := initIDAllocator(kvPath) + + rocksdbPath := rmqPath + "_db" + + rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, idAllocator) + return rmq +} + +func removePath(rmqPath string) { + kvPath := rmqPath + "_kv" + os.RemoveAll(kvPath) + rocksdbPath := rmqPath + "_db" + os.RemoveAll(rocksdbPath) + metaPath := rmqPath + "_meta_kv" + os.RemoveAll(metaPath) +}