From d3027c0d28568b8adbef7aa5ee4d7bb716255173 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 2 Sep 2021 14:56:10 +0800 Subject: [PATCH] Fix possible deadlock (#7428) Signed-off-by: Congqi Xia --- internal/msgstream/mq_msgstream.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index d7942ca983..ed1a54af70 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -96,9 +96,9 @@ func (ms *mqMsgStream) AsProducer(channels []string) { } ms.producerLock.Lock() + defer ms.producerLock.Unlock() ms.producers[channel] = pp ms.producerChannels = append(ms.producerChannels, channel) - ms.producerLock.Unlock() return nil } err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200)) @@ -131,9 +131,9 @@ func (ms *mqMsgStream) AsConsumer(channels []string, subName string) { } ms.consumerLock.Lock() + defer ms.consumerLock.Unlock() ms.consumers[channel] = pc ms.consumerChannels = append(ms.consumerChannels, channel) - ms.consumerLock.Unlock() return nil } err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200)) @@ -243,15 +243,18 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + ms.producerLock.Lock() if err := ms.producers[channel].Send( spanCtx, msg, ); err != nil { + ms.producerLock.Unlock() trace.LogError(sp, err) sp.Finish() return err } sp.Finish() + ms.producerLock.Unlock() } } return nil @@ -285,6 +288,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error { spanCtx, msg, ); err != nil { + ms.producerLock.Unlock() trace.LogError(sp, err) sp.Finish() return err @@ -494,8 +498,8 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) { } ms.consumerLock.Lock() + defer ms.consumerLock.Unlock() ms.addConsumer(pc, channel) - ms.consumerLock.Unlock() return nil } err := retry.Do(context.TODO(), fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200))