mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
enhance: refine lock granularity for produers in msgstream (#38262)
issue: #38261 Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
aa4eb2f6df
commit
a1e14d62c7
@ -29,6 +29,7 @@ import (
|
||||
"github.com/samber/lo"
|
||||
uatomic "go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
@ -310,7 +311,14 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
|
||||
k := k
|
||||
v := v
|
||||
eg.Go(func() error {
|
||||
ms.producerLock.RLock()
|
||||
channel := ms.producerChannels[k]
|
||||
producer, ok := ms.producers[channel]
|
||||
ms.producerLock.RUnlock()
|
||||
if !ok {
|
||||
return errors.New("producer not found for channel: " + channel)
|
||||
}
|
||||
|
||||
for i := 0; i < len(v.Msgs); i++ {
|
||||
spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
|
||||
defer sp.End()
|
||||
@ -330,13 +338,10 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
|
||||
}}
|
||||
InjectCtx(spanCtx, msg.Properties)
|
||||
|
||||
ms.producerLock.RLock()
|
||||
if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil {
|
||||
ms.producerLock.RUnlock()
|
||||
if _, err := producer.Send(spanCtx, msg); err != nil {
|
||||
sp.RecordError(err)
|
||||
return err
|
||||
}
|
||||
ms.producerLock.RUnlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -375,18 +380,20 @@ func (ms *mqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) (map[str
|
||||
msg := &common.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
InjectCtx(spanCtx, msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
for channel, producer := range ms.producers {
|
||||
ms.producerLock.RLock()
|
||||
// since the element never be removed in ms.producers, so it's safe to clone and iterate producers
|
||||
producers := maps.Clone(ms.producers)
|
||||
ms.producerLock.RUnlock()
|
||||
|
||||
for channel, producer := range producers {
|
||||
id, err := producer.Send(spanCtx, msg)
|
||||
if err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
sp.RecordError(err)
|
||||
sp.End()
|
||||
return ids, err
|
||||
}
|
||||
ids[channel] = append(ids[channel], id)
|
||||
}
|
||||
ms.producerLock.Unlock()
|
||||
sp.End()
|
||||
}
|
||||
return ids, nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user