From c51922eb0b16e460306ba37d62585995ab65feff Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 13 Jul 2023 21:46:31 +0800 Subject: [PATCH] Use sync.Map to reduce lock granularity to make create collection faster (#25485) Signed-off-by: sunby Co-authored-by: sunby --- pkg/mq/msgdispatcher/client.go | 46 ++++++++++++++--------------- pkg/mq/msgdispatcher/client_test.go | 8 ++++- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/pkg/mq/msgdispatcher/client.go b/pkg/mq/msgdispatcher/client.go index 7298e642c6..7bea3d6d7f 100644 --- a/pkg/mq/msgdispatcher/client.go +++ b/pkg/mq/msgdispatcher/client.go @@ -43,19 +43,17 @@ type Client interface { var _ Client = (*client)(nil) type client struct { - role string - nodeID int64 - managerMu sync.Mutex - managers map[string]DispatcherManager // pchannel -> DispatcherManager - factory msgstream.Factory + role string + nodeID int64 + managers sync.Map // pchannel -> DispatcherManager + factory msgstream.Factory } func NewClient(factory msgstream.Factory, role string, nodeID int64) Client { return &client{ - role: role, - nodeID: nodeID, - managers: make(map[string]DispatcherManager), - factory: factory, + role: role, + nodeID: nodeID, + factory: factory, } } @@ -63,19 +61,21 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) pchannel := funcutil.ToPhysicalChannel(vchannel) - c.managerMu.Lock() - defer c.managerMu.Unlock() - manager, ok := c.managers[pchannel] + var manager DispatcherManager + res, ok := c.managers.Load(pchannel) if !ok { manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory) - c.managers[pchannel] = manager + c.managers.Store(pchannel, manager) go manager.Run() + } else { + manager, _ = res.(DispatcherManager) } + ch, err := manager.Add(vchannel, pos, subPos) if err != nil { if manager.Num() == 0 { manager.Close() - delete(c.managers, pchannel) + c.managers.Delete(pchannel) } log.Error("register failed", zap.Error(err)) return nil, err @@ -86,13 +86,12 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg func (c *client) Deregister(vchannel string) { pchannel := funcutil.ToPhysicalChannel(vchannel) - c.managerMu.Lock() - defer c.managerMu.Unlock() - if manager, ok := c.managers[pchannel]; ok { + if res, ok := c.managers.Load(pchannel); ok { + manager, _ := res.(DispatcherManager) manager.Remove(vchannel) if manager.Num() == 0 { manager.Close() - delete(c.managers, pchannel) + c.managers.Delete(pchannel) } log.Info("deregister done", zap.String("role", c.role), zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) @@ -102,12 +101,13 @@ func (c *client) Deregister(vchannel string) { func (c *client) Close() { log := log.With(zap.String("role", c.role), zap.Int64("nodeID", c.nodeID)) - c.managerMu.Lock() - defer c.managerMu.Unlock() - for pchannel, manager := range c.managers { + c.managers.Range(func(key, value any) bool { + pchannel := key.(string) + manager := value.(DispatcherManager) log.Info("close manager", zap.String("channel", pchannel)) - delete(c.managers, pchannel) + c.managers.Delete(pchannel) manager.Close() - } + return true + }) log.Info("dispatcher client closed") } diff --git a/pkg/mq/msgdispatcher/client_test.go b/pkg/mq/msgdispatcher/client_test.go index ec5a39761a..70a9851342 100644 --- a/pkg/mq/msgdispatcher/client_test.go +++ b/pkg/mq/msgdispatcher/client_test.go @@ -60,5 +60,11 @@ func TestClient_Concurrency(t *testing.T) { } wg.Wait() expected := int(total - deregisterCount.Load()) - assert.Equal(t, expected, len(client1.(*client).managers)) + + var n int + client1.(*client).managers.Range(func(_, _ any) bool { + n++ + return true + }) + assert.Equal(t, expected, n) }