diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index fe2dc6ba1a..4358031c64 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -56,7 +56,7 @@ type mqMsgStream struct { closeRWMutex *sync.RWMutex streamCancel func() bufSize int64 - producerLock *sync.Mutex + producerLock *sync.RWMutex consumerLock *sync.Mutex closed int32 onceChan sync.Once @@ -88,7 +88,7 @@ func NewMqMsgStream(ctx context.Context, bufSize: bufSize, receiveBuf: receiveBuf, streamCancel: streamCancel, - producerLock: &sync.Mutex{}, + producerLock: &sync.RWMutex{}, consumerLock: &sync.Mutex{}, closeRWMutex: &sync.RWMutex{}, closed: 0, @@ -288,13 +288,13 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} InjectCtx(spanCtx, msg.Properties) - ms.producerLock.Lock() + ms.producerLock.RLock() if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil { - ms.producerLock.Unlock() + ms.producerLock.RUnlock() sp.RecordError(err) return err } - ms.producerLock.Unlock() + ms.producerLock.RUnlock() } } return nil