Fix the kafka panic when sending the message to a closed channel (#25116) (#25117)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2023-06-27 10:42:44 +08:00 committed by GitHub
parent 17796743dd
commit a1e6428bc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 3 deletions

View File

@ -7,13 +7,12 @@ import (
"time" "time"
"github.com/confluentinc/confluent-kafka-go/kafka" "github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/timerecord"
"go.uber.org/zap"
) )
type kafkaProducer struct { type kafkaProducer struct {
@ -21,6 +20,7 @@ type kafkaProducer struct {
topic string topic string
deliveryChan chan kafka.Event deliveryChan chan kafka.Event
closeOnce sync.Once closeOnce sync.Once
isClosed bool
} }
func (kp *kafkaProducer) Topic() string { func (kp *kafkaProducer) Topic() string {
@ -31,6 +31,12 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe
start := timerecord.NewTimeRecorder("send msg to stream") start := timerecord.NewTimeRecorder("send msg to stream")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc()
if kp.isClosed {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
log.Error("kafka produce message fail because the producer has been closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed"))
}
headers := make([]kafka.Header, 0, len(message.Properties)) headers := make([]kafka.Header, 0, len(message.Properties))
for key, value := range message.Properties { for key, value := range message.Properties {
header := kafka.Header{Key: key, Value: []byte(value)} header := kafka.Header{Key: key, Value: []byte(value)}
@ -69,9 +75,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe
func (kp *kafkaProducer) Close() { func (kp *kafkaProducer) Close() {
kp.closeOnce.Do(func() { kp.closeOnce.Do(func() {
kp.isClosed = true
start := time.Now() start := time.Now()
//flush in-flight msg within queue. //flush in-flight msg within queue.
kp.p.Flush(10000) i := kp.p.Flush(10000)
if i > 0 {
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.Any("topic", kp.topic))
}
close(kp.deliveryChan) close(kp.deliveryChan)

View File

@ -67,3 +67,30 @@ func TestKafkaProducer_SendFail(t *testing.T) {
producer.Close() producer.Close()
} }
} }
func TestKafkaProducer_SendFailAfterClose(t *testing.T) {
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
assert.NotNil(t, kc)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())
producer, err := kc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic})
assert.Nil(t, err)
assert.NotNil(t, producer)
producer.Close()
kafkaProd := producer.(*kafkaProducer)
assert.Equal(t, kafkaProd.Topic(), topic)
msg2 := &mqwrapper.ProducerMessage{
Payload: []byte{},
Properties: map[string]string{},
}
_, err = producer.Send(context.TODO(), msg2)
time.Sleep(10 * time.Second)
assert.NotNil(t, err)
}