From bb9ccbb7e271ee0f58e5f7f0a040002b96812ac4 Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 29 Apr 2022 17:29:47 +0800 Subject: [PATCH] Use a singleton kafka producer (#16739) Signed-off-by: yun.zhang --- .../msgstream/mqwrapper/kafka/kafka_client.go | 24 ++++++++-- .../mqwrapper/kafka/kafka_producer.go | 45 +++++++++++++------ 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go index d468781210..effc116926 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -2,6 +2,7 @@ package kafka import ( "strconv" + "sync" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/internal/log" @@ -9,6 +10,9 @@ import ( "go.uber.org/zap" ) +var Producer *kafka.Producer +var once sync.Once + type kafkaClient struct { // more configs you can see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md basicConfig kafka.ConfigMap @@ -34,6 +38,21 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { return &newConfig } +func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { + var err error + once.Do(func() { + config := kc.newProducerConfig() + Producer, err = kafka.NewProducer(config) + }) + + if err != nil { + log.Error("create sync kafka producer failed", zap.Error(err)) + return nil, err + } + + return Producer, nil +} + func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap { newConf := cloneKafkaConfig(kc.basicConfig) // default max message size 5M @@ -41,6 +60,7 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap { newConf.SetKey("compression.codec", "zstd") newConf.SetKey("go.events.channel.size", 0) newConf.SetKey("go.produce.channel.size", 0) + newConf.SetKey("linger.ms", 20) return newConf } @@ -68,10 +88,8 @@ func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.Subscrip } func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { - config := kc.newProducerConfig() - pp, err := kafka.NewProducer(config) + pp, err := kc.getKafkaProducer() if err != nil { - log.Error("kafka create sync producer , error", zap.Error(err)) return nil, err } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index bad354a6c7..ca39855684 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" @@ -24,22 +26,38 @@ func (kp *kafkaProducer) Topic() string { } func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { - err := kp.p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, - Value: message.Payload, - }, kp.deliveryChan) + var err error + maxAttempt := 3 - if err != nil { - return nil, err + // In order to avoid https://github.com/confluentinc/confluent-kafka-go/issues/769, + // just retry produce again when getting a nil from delivery chan. + for i := 0; i < maxAttempt; i++ { + err = kp.p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, + Value: message.Payload, + }, kp.deliveryChan) + + if err != nil { + break + } + + e := <-kp.deliveryChan + if e == nil { + errMsg := "produce message arise exception, delivery Chan return a nil value" + err = errors.New(errMsg) + log.Warn(errMsg, zap.String("topic", kp.topic), zap.ByteString("msg", message.Payload), zap.Int("retries", i)) + continue + } + + m := e.(*kafka.Message) + if m.TopicPartition.Error != nil { + return nil, m.TopicPartition.Error + } + + return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil } - e := <-kp.deliveryChan - m := e.(*kafka.Message) - if m.TopicPartition.Error != nil { - return nil, m.TopicPartition.Error - } - - return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil + return nil, err } func (kp *kafkaProducer) Close() { @@ -48,7 +66,6 @@ func (kp *kafkaProducer) Close() { //flush in-flight msg within queue. kp.p.Flush(10000) - kp.p.Close() close(kp.deliveryChan) cost := time.Since(start).Milliseconds()