diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 63ee1209e3..b3496cba54 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -22,6 +22,7 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -55,6 +56,7 @@ type mqMsgStream struct { producerLock *sync.Mutex consumerLock *sync.Mutex readerLock *sync.Mutex + closed int32 } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -87,6 +89,7 @@ func NewMqMsgStream(ctx context.Context, consumerLock: &sync.Mutex{}, readerLock: &sync.Mutex{}, wait: &sync.WaitGroup{}, + closed: 0, } return stream, nil @@ -183,6 +186,9 @@ func (ms *mqMsgStream) Start() { } func (ms *mqMsgStream) Close() { + if !atomic.CompareAndSwapInt32(&ms.closed, 0, 1) { + return + } ms.streamCancel() ms.wait.Wait() @@ -198,6 +204,7 @@ func (ms *mqMsgStream) Close() { } ms.client.Close() + close(ms.receiveBuf) } func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 { @@ -649,21 +656,8 @@ func (ms *MqTtMsgStream) Start() { // Close will stop goroutine and free internal producers and consumers func (ms *MqTtMsgStream) Close() { - ms.streamCancel() close(ms.syncConsumer) - ms.wait.Wait() - - for _, producer := range ms.producers { - if producer != nil { - producer.Close() - } - } - for _, consumer := range ms.consumers { - if consumer != nil { - consumer.Close() - } - } - ms.client.Close() + ms.mqMsgStream.Close() } func (ms *MqTtMsgStream) bufMsgPackToChannel() {