From 5698bf42360b83d68cdd71b16234295f29dc77a6 Mon Sep 17 00:00:00 2001 From: jaime Date: Thu, 2 Jun 2022 12:12:03 +0800 Subject: [PATCH] Add sasl configurations for kafka (#17323) Signed-off-by: yun.zhang --- configs/milvus.yaml | 2 ++ internal/mq/msgstream/mq_factory.go | 8 ++--- .../msgstream/mqwrapper/kafka/kafka_client.go | 34 +++++++++++++++---- .../mqwrapper/kafka/kafka_client_test.go | 13 +++++++ internal/util/paramtable/service_param.go | 16 +++++++-- 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7d42e84964..ccc93df580 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -75,6 +75,8 @@ pulsar: # If you want to enable kafka, needs to comment the pulsar configs #kafka: # brokerList: localhost1:9092,localhost2:9092,localhost3:9092 +# saslUsername: username +# saslPassword: password rocksmq: # please adjust in embedded Milvus: /tmp/milvus/rdb_data diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index cda2101360..63c63a1bd3 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -124,17 +124,17 @@ func NewRmsFactory(path string) *RmsFactory { type KmsFactory struct { dispatcherFactory ProtoUDFactory - KafkaAddress string + config *paramtable.KafkaConfig ReceiveBufSize int64 } func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { - kafkaClient := kafkawrapper.NewKafkaClientInstance(f.KafkaAddress) + kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfig(f.config) return NewMqMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { - kafkaClient := kafkawrapper.NewKafkaClientInstance(f.KafkaAddress) + kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfig(f.config) return NewMqTtMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } @@ -146,7 +146,7 @@ func NewKmsFactory(config *paramtable.KafkaConfig) Factory { f := &KmsFactory{ dispatcherFactory: ProtoUDFactory{}, ReceiveBufSize: 1024, - KafkaAddress: config.Address, + config: config, } return f } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go index caf0a92867..941aeece98 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -4,6 +4,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -18,18 +20,38 @@ type kafkaClient struct { basicConfig kafka.ConfigMap } -func NewKafkaClientInstance(address string) *kafkaClient { - config := kafka.ConfigMap{ - "bootstrap.servers": address, - "socket.timeout.ms": 300000, - "socket.max.fails": 3, - //"receive.message.max.bytes": 10485760, +func getBasicConfig(address string) kafka.ConfigMap { + return kafka.ConfigMap{ + "bootstrap.servers": address, + "socket.timeout.ms": 300000, + "socket.max.fails": 3, "api.version.request": true, } +} +func NewKafkaClientInstance(address string) *kafkaClient { + config := getBasicConfig(address) return &kafkaClient{basicConfig: config} } +func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient { + kafkaConfig := getBasicConfig(config.Address) + + if (config.SaslUsername == "" && config.SaslPassword != "") || + (config.SaslUsername != "" && config.SaslPassword == "") { + panic("enable security mode need config username and password at the same time!") + } + + if config.SaslUsername != "" && config.SaslPassword != "" { + kafkaConfig.SetKey("sasl.mechanisms", "PLAIN") + kafkaConfig.SetKey("security.protocol", "SASL_SSL") + kafkaConfig.SetKey("sasl.username", config.SaslUsername) + kafkaConfig.SetKey("sasl.password", config.SaslPassword) + } + + return &kafkaClient{basicConfig: kafkaConfig} +} + func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { newConfig := make(kafka.ConfigMap) for k, v := range config { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 6a130c6b73..a5ef4fb9d0 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -278,6 +278,19 @@ func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) { assert.Nil(t, msgID) } +func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { + config1 := ¶mtable.KafkaConfig{Address: "addr", SaslPassword: "password"} + assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config1) }) + + config2 := ¶mtable.KafkaConfig{Address: "addr", SaslUsername: "username"} + assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(config2) }) + + config3 := ¶mtable.KafkaConfig{Address: "addr", SaslUsername: "username", SaslPassword: "password"} + client := NewKafkaClientInstanceWithConfig(config3) + assert.NotNil(t, client) + assert.NotNil(t, client.basicConfig) +} + func createKafkaClient(t *testing.T) *kafkaClient { kafkaAddress, _ := Params.Load("_KafkaBrokerList") kc := NewKafkaClientInstance(kafkaAddress) diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 804c798361..0cb99bea33 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -234,13 +234,17 @@ func (p *PulsarConfig) initMaxMessageSize() { // --- kafka --- type KafkaConfig struct { - Base *BaseTable - Address string + Base *BaseTable + Address string + SaslUsername string + SaslPassword string } func (k *KafkaConfig) init(base *BaseTable) { k.Base = base k.initAddress() + k.initSaslUsername() + k.initSaslPassword() } func (k *KafkaConfig) initAddress() { @@ -251,6 +255,14 @@ func (k *KafkaConfig) initAddress() { k.Address = addr } +func (k *KafkaConfig) initSaslUsername() { + k.SaslUsername = k.Base.LoadWithDefault("kafka.saslUsername", "") +} + +func (k *KafkaConfig) initSaslPassword() { + k.SaslPassword = k.Base.LoadWithDefault("kafka.saslPassword", "") +} + /////////////////////////////////////////////////////////////////////////////// // --- rocksmq --- type RocksmqConfig struct {