From d3d5a41b7636c671fe63990dff4dba47c5a39d43 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Tue, 15 Mar 2022 15:55:21 +0800 Subject: [PATCH] Fix memory leak casued by unclosed channel in msgstream (#16051) issue: #16045 Signed-off-by: sunby Co-authored-by: sunby --- internal/mq/msgstream/mq_msgstream.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 63ee1209e3..b3496cba54 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -22,6 +22,7 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -55,6 +56,7 @@ type mqMsgStream struct { producerLock *sync.Mutex consumerLock *sync.Mutex readerLock *sync.Mutex + closed int32 } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -87,6 +89,7 @@ func NewMqMsgStream(ctx context.Context, consumerLock: &sync.Mutex{}, readerLock: &sync.Mutex{}, wait: &sync.WaitGroup{}, + closed: 0, } return stream, nil @@ -183,6 +186,9 @@ func (ms *mqMsgStream) Start() { } func (ms *mqMsgStream) Close() { + if !atomic.CompareAndSwapInt32(&ms.closed, 0, 1) { + return + } ms.streamCancel() ms.wait.Wait() @@ -198,6 +204,7 @@ func (ms *mqMsgStream) Close() { } ms.client.Close() + close(ms.receiveBuf) } func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 { @@ -649,21 +656,8 @@ func (ms *MqTtMsgStream) Start() { // Close will stop goroutine and free internal producers and consumers func (ms *MqTtMsgStream) Close() { - ms.streamCancel() close(ms.syncConsumer) - ms.wait.Wait() - - for _, producer := range ms.producers { - if producer != nil { - producer.Close() - } - } - for _, consumer := range ms.consumers { - if consumer != nil { - consumer.Close() - } - } - ms.client.Close() + ms.mqMsgStream.Close() } func (ms *MqTtMsgStream) bufMsgPackToChannel() {