diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index b3496cba54..0959d82514 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -186,9 +186,6 @@ func (ms *mqMsgStream) Start() { } func (ms *mqMsgStream) Close() { - if !atomic.CompareAndSwapInt32(&ms.closed, 0, 1) { - return - } ms.streamCancel() ms.wait.Wait() @@ -204,6 +201,10 @@ func (ms *mqMsgStream) Close() { } ms.client.Close() + + if !atomic.CompareAndSwapInt32(&ms.closed, 0, 1) { + return + } close(ms.receiveBuf) }