From d443b5420a6a551e529c32e08c563780a52aa8fc Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Thu, 25 Aug 2022 11:02:53 +0800 Subject: [PATCH] add user special kafka config (#18742) Signed-off-by: wgcn <1026688210@qq.com> Signed-off-by: wgcn <1026688210@qq.com> Co-authored-by: wanggang11335 --- configs/milvus.yaml | 6 ++- internal/config/manager.go | 15 ++++++- .../mq/msgstream/mq_kafka_msgstream_test.go | 2 +- .../msgstream/mqwrapper/kafka/kafka_client.go | 44 ++++++++++++++++--- .../mqwrapper/kafka/kafka_client_test.go | 23 ++++++++++ internal/util/paramtable/base_table.go | 6 ++- internal/util/paramtable/base_table_test.go | 20 +++++++++ .../util/paramtable/component_param_test.go | 10 +++++ internal/util/paramtable/service_param.go | 22 +++++++--- 9 files changed, 132 insertions(+), 16 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c7c67d111d..2e21c8ab15 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -96,7 +96,11 @@ pulsar: maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar. # If you want to enable kafka, needs to comment the pulsar configs -#kafka: +kafka: + producer: + client.id: dc + consumer: + client.id: dc1 # brokerList: localhost1:9092,localhost2:9092,localhost3:9092 # saslUsername: username # saslPassword: password diff --git a/internal/config/manager.go b/internal/config/manager.go index e69de2dcc5..f2f75d4bf7 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -64,7 +64,9 @@ func (m *Manager) GetConfig(key string) (string, error) { } //GetConfigsByPattern returns key values that matched pattern -func (m *Manager) GetConfigsByPattern(pattern string) map[string]string { +// withPrefix : whether key include the prefix of pattern +func (m *Manager) GetConfigsByPattern(pattern string, withPrefix bool) map[string]string { + m.RLock() defer m.RUnlock() matchedConfig := make(map[string]string) @@ -77,7 +79,16 @@ func (m *Manager) GetConfigsByPattern(pattern string) map[string]string { if err != nil { continue } - matchedConfig[key] = sValue + + checkAndCutOffKey := func() string { + if withPrefix { + return key + } + return strings.Replace(key, pattern, "", 1) + } + + finalKey := checkAndCutOffKey() + matchedConfig[finalKey] = sValue } } return matchedConfig diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index c59112a04e..51849ed7f5 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -445,7 +445,7 @@ func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChann "api.version.request": true, "linger.ms": 10, } - kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config) + kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config, nil, nil) inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go index b69ce344b3..038d87ee56 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -1,6 +1,7 @@ package kafka import ( + "fmt" "strconv" "sync" @@ -17,7 +18,9 @@ var once sync.Once type kafkaClient struct { // more configs you can see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - basicConfig kafka.ConfigMap + basicConfig kafka.ConfigMap + consumerConfig kafka.ConfigMap + producerConfig kafka.ConfigMap } func getBasicConfig(address string) kafka.ConfigMap { @@ -31,11 +34,16 @@ func getBasicConfig(address string) kafka.ConfigMap { func NewKafkaClientInstance(address string) *kafkaClient { config := getBasicConfig(address) - return &kafkaClient{basicConfig: config} + return NewKafkaClientInstanceWithConfigMap(config, kafka.ConfigMap{}, kafka.ConfigMap{}) + } -func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap) *kafkaClient { - return &kafkaClient{basicConfig: config} +func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerConfig kafka.ConfigMap, extraProducerConfig kafka.ConfigMap) *kafkaClient { + log.Info("init kafka Config ", zap.String("commonConfig", fmt.Sprintf("+%v", config)), + zap.String("extraConsumerConfig", fmt.Sprintf("+%v", extraConsumerConfig)), + zap.String("extraProducerConfig", fmt.Sprintf("+%v", extraProducerConfig)), + ) + return &kafkaClient{basicConfig: config, consumerConfig: extraConsumerConfig, producerConfig: extraProducerConfig} } func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient { @@ -53,7 +61,16 @@ func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClie kafkaConfig.SetKey("sasl.password", config.SaslPassword) } - return &kafkaClient{basicConfig: kafkaConfig} + specExtraConfig := func(config map[string]string) kafka.ConfigMap { + kafkaConfigMap := make(kafka.ConfigMap, len(config)) + for k, v := range config { + kafkaConfigMap.SetKey(k, v) + } + return kafkaConfigMap + } + + return NewKafkaClientInstanceWithConfigMap(kafkaConfig, specExtraConfig(config.ConsumerExtraConfig), specExtraConfig(config.ProducerExtraConfig)) + } func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { @@ -103,6 +120,10 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap { newConf.SetKey("message.max.bytes", 10485760) newConf.SetKey("compression.codec", "zstd") newConf.SetKey("linger.ms", 20) + + //special producer config + kc.specialExtraConfig(newConf, kc.producerConfig) + return newConf } @@ -126,6 +147,9 @@ func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.Subscrip //newConf.SetKey("enable.partition.eof", true) newConf.SetKey("go.events.channel.enable", true) + + kc.specialExtraConfig(newConf, kc.consumerConfig) + return newConf } @@ -159,6 +183,16 @@ func (kc *kafkaClient) StringToMsgID(id string) (mqwrapper.MessageID, error) { return &kafkaID{messageID: offset}, nil } +func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafka.ConfigMap) { + for k, v := range special { + if existingConf, _ := current.Get(k, nil); existingConf != nil { + log.Warn(fmt.Sprintf("The existing config : %v=%v will be covered by the speciled kafka config : %v.", k, v, existingConf)) + } + + current.SetKey(k, v) + } +} + func (kc *kafkaClient) BytesToMsgID(id []byte) (mqwrapper.MessageID, error) { offset := DeserializeKafkaID(id) return &kafkaID{messageID: offset}, nil diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index e4f7ff0be6..89b4909c8c 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -308,6 +308,29 @@ func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { client := NewKafkaClientInstanceWithConfig(config3) assert.NotNil(t, client) assert.NotNil(t, client.basicConfig) + + consumerConfig := make(map[string]string) + consumerConfig["client.id"] = "dc" + config4 := ¶mtable.KafkaConfig{Address: "addr", SaslUsername: "username", SaslPassword: "password", ConsumerExtraConfig: consumerConfig} + client4 := NewKafkaClientInstanceWithConfig(config4) + assert.Equal(t, "dc", client4.consumerConfig["client.id"]) + + newConsumerConfig := client4.newConsumerConfig("test", 0) + clientID, err := newConsumerConfig.Get("client.id", "") + assert.Nil(t, err) + assert.Equal(t, "dc", clientID) + + producerConfig := make(map[string]string) + producerConfig["client.id"] = "dc1" + config5 := ¶mtable.KafkaConfig{Address: "addr", SaslUsername: "username", SaslPassword: "password", ProducerExtraConfig: producerConfig} + client5 := NewKafkaClientInstanceWithConfig(config5) + assert.Equal(t, "dc1", client5.producerConfig["client.id"]) + + newProducerConfig := client5.newProducerConfig() + pClientID, err := newProducerConfig.Get("client.id", "") + assert.Nil(t, err) + assert.Equal(t, pClientID, "dc1") + } func createKafkaClient(t *testing.T) *kafkaClient { diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 661face1b2..5d3205ee29 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -212,7 +212,11 @@ func (gp *BaseTable) Get(key string) string { } func (gp *BaseTable) GetByPattern(pattern string) map[string]string { - return gp.mgr.GetConfigsByPattern(pattern) + return gp.mgr.GetConfigsByPattern(pattern, true) +} + +func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string { + return gp.mgr.GetConfigsByPattern(pattern, false) } // For compatible reason, only visiable for Test diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index 0852579405..5ac39792dc 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -13,6 +13,7 @@ package paramtable import ( "os" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,25 @@ func TestMain(m *testing.M) { os.Exit(code) } +func TestBaseTable_GetConfigSubSet(t *testing.T) { + prefix := "rootcoord." + configs := baseParams.mgr.Configs() + + configsWithPrefix := make(map[string]string) + for k, v := range configs { + if strings.HasPrefix(k, prefix) { + configsWithPrefix[k] = v + } + } + + subSet := baseParams.GetConfigSubSet(prefix) + + for k := range configs { + assert.Equal(t, subSet[k], configs[prefix+k]) + } + assert.Equal(t, len(subSet), len(configsWithPrefix)) +} + func TestBaseTable_SaveAndLoad(t *testing.T) { err1 := baseParams.Save("int", "10") assert.Nil(t, err1) diff --git a/internal/util/paramtable/component_param_test.go b/internal/util/paramtable/component_param_test.go index 8edb316d3a..62fe10dec8 100644 --- a/internal/util/paramtable/component_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -30,6 +30,16 @@ func TestComponentParam(t *testing.T) { var CParams ComponentParam CParams.Init() + t.Run("test kafkaConfig", func(t *testing.T) { + + params := CParams.ServiceParam.KafkaCfg + producerConfig := params.ProducerExtraConfig + assert.Equal(t, "dc", producerConfig["client.id"]) + + consumerConfig := params.ConsumerExtraConfig + assert.Equal(t, "dc1", consumerConfig["client.id"]) + }) + t.Run("test commonConfig", func(t *testing.T) { Params := CParams.CommonCfg diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 0c02f36812..9034034460 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -38,6 +38,8 @@ const ( SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 defaultEtcdLogLevel = "info" defaultEtcdLogPath = "stdout" + KafkaProducerConfigPrefix = "kafka.producer." + KafkaConsumerConfigPrefix = "kafka.consumer." ) // ServiceParam is used to quickly and easily access all basic service configurations. @@ -365,12 +367,14 @@ func (p *PulsarConfig) initMaxMessageSize() { // --- kafka --- type KafkaConfig struct { - Base *BaseTable - Address string - SaslUsername string - SaslPassword string - SaslMechanisms string - SecurityProtocol string + Base *BaseTable + Address string + SaslUsername string + SaslPassword string + SaslMechanisms string + SecurityProtocol string + ConsumerExtraConfig map[string]string + ProducerExtraConfig map[string]string } func (k *KafkaConfig) init(base *BaseTable) { @@ -380,6 +384,7 @@ func (k *KafkaConfig) init(base *BaseTable) { k.initSaslPassword() k.initSaslMechanisms() k.initSecurityProtocol() + k.initExtraKafkaConfig() } func (k *KafkaConfig) initAddress() { @@ -402,6 +407,11 @@ func (k *KafkaConfig) initSecurityProtocol() { k.SecurityProtocol = k.Base.LoadWithDefault("kafka.securityProtocol", "SASL_SSL") } +func (k *KafkaConfig) initExtraKafkaConfig() { + k.ConsumerExtraConfig = k.Base.GetConfigSubSet(KafkaConsumerConfigPrefix) + k.ProducerExtraConfig = k.Base.GetConfigSubSet(KafkaProducerConfigPrefix) +} + /////////////////////////////////////////////////////////////////////////////// // --- rocksmq --- type RocksmqConfig struct {