From a1e14d62c70a8b9c7fad845fe97b8e5788774556 Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 6 Dec 2024 17:14:40 +0800 Subject: [PATCH] enhance: refine lock granularity for produers in msgstream (#38262) issue: #38261 Signed-off-by: jaime --- pkg/mq/msgstream/mq_msgstream.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 5127a841c9..808b4da318 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -29,6 +29,7 @@ import ( "github.com/samber/lo" uatomic "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -310,7 +311,14 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error { k := k v := v eg.Go(func() error { + ms.producerLock.RLock() channel := ms.producerChannels[k] + producer, ok := ms.producers[channel] + ms.producerLock.RUnlock() + if !ok { + return errors.New("producer not found for channel: " + channel) + } + for i := 0; i < len(v.Msgs); i++ { spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i]) defer sp.End() @@ -330,13 +338,10 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error { }} InjectCtx(spanCtx, msg.Properties) - ms.producerLock.RLock() - if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil { - ms.producerLock.RUnlock() + if _, err := producer.Send(spanCtx, msg); err != nil { sp.RecordError(err) return err } - ms.producerLock.RUnlock() } return nil }) @@ -375,18 +380,20 @@ func (ms *mqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) (map[str msg := &common.ProducerMessage{Payload: m, Properties: map[string]string{}} InjectCtx(spanCtx, msg.Properties) - ms.producerLock.Lock() - for channel, producer := range ms.producers { + ms.producerLock.RLock() + // since the element never be removed in ms.producers, so it's safe to clone and iterate producers + producers := maps.Clone(ms.producers) + ms.producerLock.RUnlock() + + for channel, producer := range producers { id, err := producer.Send(spanCtx, msg) if err != nil { - ms.producerLock.Unlock() sp.RecordError(err) sp.End() return ids, err } ids[channel] = append(ids[channel], id) } - ms.producerLock.Unlock() sp.End() } return ids, nil