diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 461f80d4de..8c56bdf69b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -259,6 +259,7 @@ pulsar: # tlsCaCert: # file or directory path to CA certificate(s) for verifying the broker's key # tlsKeyPassword: # private key passphrase for use with ssl.key.location and set_ssl_cert(), if any # readTimeout: 10 +# queuedmaxkbytes: 100000 rocksmq: # Prefix of the key to where Milvus stores data in RocksMQ. diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 6e6909a288..20e5958f4c 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -86,6 +86,10 @@ func GetBasicConfig(config *paramtable.KafkaConfig) kafka.ConfigMap { kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue()) } + if config.QueuedMessagesKbytes.GetValue() != "" { + kafkaConfig.SetKey("queued.max.messages.kbytes", config.QueuedMessagesKbytes.GetValue()) + } + if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" { kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue()) kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue()) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 66e78838ec..076fa41915 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -1080,19 +1080,20 @@ The retention policy of pulsar can set shorter to save the storage space in this // --- kafka --- type KafkaConfig struct { - Address ParamItem `refreshable:"false"` - SaslUsername ParamItem `refreshable:"false"` - SaslPassword ParamItem `refreshable:"false"` - SaslMechanisms ParamItem `refreshable:"false"` - SecurityProtocol ParamItem `refreshable:"false"` - KafkaUseSSL ParamItem `refreshable:"false"` - KafkaTLSCert ParamItem `refreshable:"false"` - KafkaTLSKey ParamItem `refreshable:"false"` - KafkaTLSCACert ParamItem `refreshable:"false"` - KafkaTLSKeyPassword ParamItem `refreshable:"false"` - ConsumerExtraConfig ParamGroup `refreshable:"false"` - ProducerExtraConfig ParamGroup `refreshable:"false"` - ReadTimeout ParamItem `refreshable:"true"` + Address ParamItem `refreshable:"false"` + SaslUsername ParamItem `refreshable:"false"` + SaslPassword ParamItem `refreshable:"false"` + SaslMechanisms ParamItem `refreshable:"false"` + SecurityProtocol ParamItem `refreshable:"false"` + KafkaUseSSL ParamItem `refreshable:"false"` + KafkaTLSCert ParamItem `refreshable:"false"` + KafkaTLSKey ParamItem `refreshable:"false"` + KafkaTLSCACert ParamItem `refreshable:"false"` + KafkaTLSKeyPassword ParamItem `refreshable:"false"` + ConsumerExtraConfig ParamGroup `refreshable:"false"` + ProducerExtraConfig ParamGroup `refreshable:"false"` + ReadTimeout ParamItem `refreshable:"true"` + QueuedMessagesKbytes ParamItem `refreshable:"false"` } func (k *KafkaConfig) Init(base *BaseTable) { @@ -1197,6 +1198,14 @@ func (k *KafkaConfig) Init(base *BaseTable) { Export: true, } k.ReadTimeout.Init(base.mgr) + + k.QueuedMessagesKbytes = ParamItem{ + Key: "kafka.queuedmaxkbytes", + DefaultValue: "100000", // 100MB in kilo bytes + Version: "2.1.0", + Export: true, + } + k.QueuedMessagesKbytes.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////