From 4d370ff37c6f2e401682947473e01ecf394d1a25 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 20 Jun 2023 21:02:42 +0800 Subject: [PATCH] Close kafka internal consumer&producer properly (#24997) Signed-off-by: Congqi Xia --- internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go | 5 +++++ internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index 4184e830bb..f37b07c7fd 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -22,6 +22,7 @@ type Consumer struct { chanOnce sync.Once closeOnce sync.Once closeCh chan struct{} + wg sync.WaitGroup } const timeout = 3000 @@ -116,7 +117,9 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message { panic("failed to chan a kafka consumer without assign") } kc.chanOnce.Do(func() { + kc.wg.Add(1) go func() { + defer kc.wg.Done() for { select { case <-kc.closeCh: @@ -222,5 +225,7 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) { func (kc *Consumer) Close() { kc.closeOnce.Do(func() { close(kc.closeCh) + kc.wg.Wait() // wait work goroutine exit + kc.c.Close() }) } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 724a8d6671..f29fd79992 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -89,5 +89,7 @@ func (kp *kafkaProducer) Close() { if cost > 500 { log.Info("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost)) } + + kp.p.Close() }) }