From a1e6428bc78348162fc51dac5435ef389d248466 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 27 Jun 2023 10:42:44 +0800 Subject: [PATCH] Fix the kafka panic when sending the message to a closed channel (#25116) (#25117) Signed-off-by: SimFG --- .../mqwrapper/kafka/kafka_producer.go | 17 +++++++++--- .../mqwrapper/kafka/kafka_producer_test.go | 27 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 0e6c818d88..e9e1a2dd01 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -7,13 +7,12 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" - "go.uber.org/zap" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/timerecord" + "go.uber.org/zap" ) type kafkaProducer struct { @@ -21,6 +20,7 @@ type kafkaProducer struct { topic string deliveryChan chan kafka.Event closeOnce sync.Once + isClosed bool } func (kp *kafkaProducer) Topic() string { @@ -31,6 +31,12 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe start := timerecord.NewTimeRecorder("send msg to stream") metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() + if kp.isClosed { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() + log.Error("kafka produce message fail because the producer has been closed", zap.String("topic", kp.topic)) + return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed")) + } + headers := make([]kafka.Header, 0, len(message.Properties)) for key, value := range message.Properties { header := kafka.Header{Key: key, Value: []byte(value)} @@ -69,9 +75,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe func (kp *kafkaProducer) Close() { kp.closeOnce.Do(func() { + kp.isClosed = true + start := time.Now() //flush in-flight msg within queue. - kp.p.Flush(10000) + i := kp.p.Flush(10000) + if i > 0 { + log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.Any("topic", kp.topic)) + } close(kp.deliveryChan) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go index 37cac7bf02..9c80a19d1f 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go @@ -67,3 +67,30 @@ func TestKafkaProducer_SendFail(t *testing.T) { producer.Close() } } + +func TestKafkaProducer_SendFailAfterClose(t *testing.T) { + kafkaAddress := getKafkaBrokerList() + kc := NewKafkaClientInstance(kafkaAddress) + defer kc.Close() + assert.NotNil(t, kc) + + rand.Seed(time.Now().UnixNano()) + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + + producer, err := kc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic}) + assert.Nil(t, err) + assert.NotNil(t, producer) + + producer.Close() + + kafkaProd := producer.(*kafkaProducer) + assert.Equal(t, kafkaProd.Topic(), topic) + + msg2 := &mqwrapper.ProducerMessage{ + Payload: []byte{}, + Properties: map[string]string{}, + } + _, err = producer.Send(context.TODO(), msg2) + time.Sleep(10 * time.Second) + assert.NotNil(t, err) +}