diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 79c7d74057..48ef5fdfbe 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/config" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/internal/common" @@ -102,7 +104,7 @@ func Consume2(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, Topic: topic, SubscriptionName: subName, BufSize: 1024, - SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionUnknown, }) assert.Nil(t, err) assert.NotNil(t, consumer) @@ -221,7 +223,7 @@ func TestKafkaClient_SeekPosition(t *testing.T) { data := []int{1, 2, 3} ids := produceData(ctx, t, producer, data) - consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest) + consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionUnknown) defer consumer.Close() err := consumer.Seek(ids[2], true) @@ -297,60 +299,56 @@ func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) { assert.Nil(t, msgID) } +func createParamItem(v string) paramtable.ParamItem { + item := paramtable.ParamItem{ + Formatter: func(originValue string) string { return v }, + } + item.Init(&config.Manager{}) + return item +} + func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { config1 := ¶mtable.KafkaConfig{ - Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }}, - SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }}, + Address: createParamItem("addr"), + SaslPassword: createParamItem("password"), } assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config1) }) config2 := ¶mtable.KafkaConfig{ - Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }}, - SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }}, + Address: createParamItem("addr"), + SaslUsername: createParamItem("username"), } assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config2) }) - config3 := ¶mtable.KafkaConfig{ - Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }}, - SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }}, - SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }}, + producerConfig := make(map[string]string) + producerConfig["client.id"] = "dc1" + consumerConfig := make(map[string]string) + consumerConfig["client.id"] = "dc" + + config := ¶mtable.KafkaConfig{ + Address: createParamItem("addr"), + SaslUsername: createParamItem("username"), + SaslPassword: createParamItem("password"), + SaslMechanisms: createParamItem("sasl"), + SecurityProtocol: createParamItem("plain"), + ConsumerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return consumerConfig }}, + ProducerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return producerConfig }}, } - client := NewKafkaClientInstanceWithConfig(config3) + client := NewKafkaClientInstanceWithConfig(config) assert.NotNil(t, client) assert.NotNil(t, client.basicConfig) - consumerConfig := make(map[string]string) - consumerConfig["client.id"] = "dc" - config4 := ¶mtable.KafkaConfig{ - Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }}, - SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }}, - SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }}, - ConsumerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return consumerConfig }}, - } - client4 := NewKafkaClientInstanceWithConfig(config4) - assert.Equal(t, "dc", client4.consumerConfig["client.id"]) - - newConsumerConfig := client4.newConsumerConfig("test", 0) + assert.Equal(t, "dc", client.consumerConfig["client.id"]) + newConsumerConfig := client.newConsumerConfig("test", 0) clientID, err := newConsumerConfig.Get("client.id", "") assert.Nil(t, err) assert.Equal(t, "dc", clientID) - producerConfig := make(map[string]string) - producerConfig["client.id"] = "dc1" - config5 := ¶mtable.KafkaConfig{ - Address: paramtable.ParamItem{Formatter: func(originValue string) string { return "addr" }}, - SaslUsername: paramtable.ParamItem{Formatter: func(originValue string) string { return "username" }}, - SaslPassword: paramtable.ParamItem{Formatter: func(originValue string) string { return "password" }}, - ProducerExtraConfig: paramtable.ParamGroup{GetFunc: func() map[string]string { return producerConfig }}, - } - client5 := NewKafkaClientInstanceWithConfig(config5) - assert.Equal(t, "dc1", client5.producerConfig["client.id"]) - - newProducerConfig := client5.newProducerConfig() + assert.Equal(t, "dc1", client.producerConfig["client.id"]) + newProducerConfig := client.newProducerConfig() pClientID, err := newProducerConfig.Get("client.id", "") assert.Nil(t, err) assert.Equal(t, pClientID, "dc1") - } func createKafkaClient(t *testing.T) *kafkaClient { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go index 02a42ba5be..11cd4980fd 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go @@ -45,7 +45,7 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) { func TestKafkaID_Equal(t *testing.T) { rid1 := &kafkaID{messageID: 0} - rid2 := &kafkaID{messageID: 0} + rid2 := &kafkaID{messageID: 1} { ret, err := rid1.Equal(rid1.Serialize())