Close kafka internal consumer&producer properly (#24997)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-06-20 21:02:42 +08:00 committed by GitHub
parent 614045e2ff
commit 4d370ff37c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 0 deletions

View File

@ -22,6 +22,7 @@ type Consumer struct {
chanOnce sync.Once
closeOnce sync.Once
closeCh chan struct{}
wg sync.WaitGroup
}
const timeout = 3000
@ -116,7 +117,9 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
panic("failed to chan a kafka consumer without assign")
}
kc.chanOnce.Do(func() {
kc.wg.Add(1)
go func() {
defer kc.wg.Done()
for {
select {
case <-kc.closeCh:
@ -222,5 +225,7 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
func (kc *Consumer) Close() {
kc.closeOnce.Do(func() {
close(kc.closeCh)
kc.wg.Wait() // wait work goroutine exit
kc.c.Close()
})
}

View File

@ -89,5 +89,7 @@ func (kp *kafkaProducer) Close() {
if cost > 500 {
log.Info("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost))
}
kp.p.Close()
})
}