mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
fix: Add Kafka buffer size limit to prevent DataNode OOM (#44106)
issue: https://github.com/milvus-io/milvus/issues/44105 - I have added support to set this property **queued.max.messages.kbytes** in kafka consumers from the user side. - It limits the size (in KB) of the consumer’s local message queue (buffer) where messages are temporarily stored after being fetched from Kafka but before your application actually processes them --------- Signed-off-by: Nischay Yadav <Nischay.Yadav@ibm.com>
This commit is contained in:
parent
e126df2330
commit
1e704ecf9f
@ -259,6 +259,7 @@ pulsar:
|
|||||||
# tlsCaCert: # file or directory path to CA certificate(s) for verifying the broker's key
|
# tlsCaCert: # file or directory path to CA certificate(s) for verifying the broker's key
|
||||||
# tlsKeyPassword: # private key passphrase for use with ssl.key.location and set_ssl_cert(), if any
|
# tlsKeyPassword: # private key passphrase for use with ssl.key.location and set_ssl_cert(), if any
|
||||||
# readTimeout: 10
|
# readTimeout: 10
|
||||||
|
# queuedmaxkbytes: 100000
|
||||||
|
|
||||||
rocksmq:
|
rocksmq:
|
||||||
# Prefix of the key to where Milvus stores data in RocksMQ.
|
# Prefix of the key to where Milvus stores data in RocksMQ.
|
||||||
|
|||||||
@ -86,6 +86,10 @@ func GetBasicConfig(config *paramtable.KafkaConfig) kafka.ConfigMap {
|
|||||||
kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue())
|
kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.QueuedMessagesKbytes.GetValue() != "" {
|
||||||
|
kafkaConfig.SetKey("queued.max.messages.kbytes", config.QueuedMessagesKbytes.GetValue())
|
||||||
|
}
|
||||||
|
|
||||||
if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" {
|
if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" {
|
||||||
kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue())
|
kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue())
|
||||||
kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue())
|
kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue())
|
||||||
|
|||||||
@ -1080,19 +1080,20 @@ The retention policy of pulsar can set shorter to save the storage space in this
|
|||||||
|
|
||||||
// --- kafka ---
|
// --- kafka ---
|
||||||
type KafkaConfig struct {
|
type KafkaConfig struct {
|
||||||
Address ParamItem `refreshable:"false"`
|
Address ParamItem `refreshable:"false"`
|
||||||
SaslUsername ParamItem `refreshable:"false"`
|
SaslUsername ParamItem `refreshable:"false"`
|
||||||
SaslPassword ParamItem `refreshable:"false"`
|
SaslPassword ParamItem `refreshable:"false"`
|
||||||
SaslMechanisms ParamItem `refreshable:"false"`
|
SaslMechanisms ParamItem `refreshable:"false"`
|
||||||
SecurityProtocol ParamItem `refreshable:"false"`
|
SecurityProtocol ParamItem `refreshable:"false"`
|
||||||
KafkaUseSSL ParamItem `refreshable:"false"`
|
KafkaUseSSL ParamItem `refreshable:"false"`
|
||||||
KafkaTLSCert ParamItem `refreshable:"false"`
|
KafkaTLSCert ParamItem `refreshable:"false"`
|
||||||
KafkaTLSKey ParamItem `refreshable:"false"`
|
KafkaTLSKey ParamItem `refreshable:"false"`
|
||||||
KafkaTLSCACert ParamItem `refreshable:"false"`
|
KafkaTLSCACert ParamItem `refreshable:"false"`
|
||||||
KafkaTLSKeyPassword ParamItem `refreshable:"false"`
|
KafkaTLSKeyPassword ParamItem `refreshable:"false"`
|
||||||
ConsumerExtraConfig ParamGroup `refreshable:"false"`
|
ConsumerExtraConfig ParamGroup `refreshable:"false"`
|
||||||
ProducerExtraConfig ParamGroup `refreshable:"false"`
|
ProducerExtraConfig ParamGroup `refreshable:"false"`
|
||||||
ReadTimeout ParamItem `refreshable:"true"`
|
ReadTimeout ParamItem `refreshable:"true"`
|
||||||
|
QueuedMessagesKbytes ParamItem `refreshable:"false"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaConfig) Init(base *BaseTable) {
|
func (k *KafkaConfig) Init(base *BaseTable) {
|
||||||
@ -1197,6 +1198,14 @@ func (k *KafkaConfig) Init(base *BaseTable) {
|
|||||||
Export: true,
|
Export: true,
|
||||||
}
|
}
|
||||||
k.ReadTimeout.Init(base.mgr)
|
k.ReadTimeout.Init(base.mgr)
|
||||||
|
|
||||||
|
k.QueuedMessagesKbytes = ParamItem{
|
||||||
|
Key: "kafka.queuedmaxkbytes",
|
||||||
|
DefaultValue: "100000", // 100MB in kilo bytes
|
||||||
|
Version: "2.1.0",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
k.QueuedMessagesKbytes.Init(base.mgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// /////////////////////////////////////////////////////////////////////////////
|
// /////////////////////////////////////////////////////////////////////////////
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user