Fix the kafka panic when sending the message to a closed channel (#25116)

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2023-06-28 18:36:26 +08:00 committed by GitHub
parent 0327fc3fa1
commit bd17544fa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 1 deletions

View File

@ -37,6 +37,7 @@ type kafkaProducer struct {
topic string
deliveryChan chan kafka.Event
closeOnce sync.Once
isClosed bool
}
func (kp *kafkaProducer) Topic() string {
@ -47,6 +48,12 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe
start := timerecord.NewTimeRecorder("send msg to stream")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc()
if kp.isClosed {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
log.Error("kafka produce message fail because the producer has been closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed"))
}
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
@ -79,9 +86,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe
func (kp *kafkaProducer) Close() {
kp.closeOnce.Do(func() {
kp.isClosed = true
start := time.Now()
//flush in-flight msg within queue.
kp.p.Flush(10000)
i := kp.p.Flush(10000)
if i > 0 {
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.Any("topic", kp.topic))
}
close(kp.deliveryChan)

View File

@ -27,6 +27,8 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, producer)
producer.Close()
kafkaProd := producer.(*kafkaProducer)
assert.Equal(t, kafkaProd.Topic(), topic)
@ -35,6 +37,7 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
Properties: map[string]string{},
}
msgID, err := producer.Send(context.TODO(), msg2)
time.Sleep(30 * time.Second)
assert.Nil(t, err)
assert.NotNil(t, msgID)
@ -67,3 +70,30 @@ func TestKafkaProducer_SendFail(t *testing.T) {
producer.Close()
}
}
func TestKafkaProducer_SendFailAfterClose(t *testing.T) {
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
assert.NotNil(t, kc)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())
producer, err := kc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic})
assert.Nil(t, err)
assert.NotNil(t, producer)
producer.Close()
kafkaProd := producer.(*kafkaProducer)
assert.Equal(t, kafkaProd.Topic(), topic)
msg2 := &mqwrapper.ProducerMessage{
Payload: []byte{},
Properties: map[string]string{},
}
_, err = producer.Send(context.TODO(), msg2)
time.Sleep(10 * time.Second)
assert.NotNil(t, err)
}

View File

@ -30,6 +30,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test normal validate", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{defaultPartitionID}, []UniqueID{defaultSegmentID})
assert.NoError(t, err)
})
@ -37,6 +38,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test normal validate2", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{}, []UniqueID{defaultSegmentID})
assert.NoError(t, err)
})
@ -44,6 +46,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate non-existent collection", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID+1, []UniqueID{defaultPartitionID}, []UniqueID{defaultSegmentID})
assert.Error(t, err)
})
@ -51,6 +54,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate non-existent partition", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{defaultPartitionID + 1}, []UniqueID{defaultSegmentID})
assert.Error(t, err)
})
@ -58,6 +62,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate non-existent segment", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{defaultPartitionID}, []UniqueID{defaultSegmentID + 1})
assert.NoError(t, err)
})
@ -65,6 +70,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate segment not in given partition", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
err = his.addPartition(defaultCollectionID, defaultPartitionID+1)
assert.NoError(t, err)
schema := genTestCollectionSchema()
@ -86,6 +92,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate after partition release", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
err = his.removePartition(defaultPartitionID)
assert.NoError(t, err)
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{}, []UniqueID{defaultSegmentID})
@ -95,6 +102,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate after partition release2", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
col, err := his.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypePartition)
@ -107,6 +115,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
t.Run("test validate after partition release3", func(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
defer his.freeAll()
col, err := his.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypeCollection)