Use sync.Map to reduce lock granularity to make create collection faster (#25485)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
Co-authored-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
Bingyi Sun 2023-07-13 21:46:31 +08:00 committed by GitHub
parent 587237a3c9
commit c51922eb0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 24 deletions

View File

@ -43,19 +43,17 @@ type Client interface {
var _ Client = (*client)(nil)
type client struct {
role string
nodeID int64
managerMu sync.Mutex
managers map[string]DispatcherManager // pchannel -> DispatcherManager
factory msgstream.Factory
role string
nodeID int64
managers sync.Map // pchannel -> DispatcherManager
factory msgstream.Factory
}
func NewClient(factory msgstream.Factory, role string, nodeID int64) Client {
return &client{
role: role,
nodeID: nodeID,
managers: make(map[string]DispatcherManager),
factory: factory,
role: role,
nodeID: nodeID,
factory: factory,
}
}
@ -63,19 +61,21 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
pchannel := funcutil.ToPhysicalChannel(vchannel)
c.managerMu.Lock()
defer c.managerMu.Unlock()
manager, ok := c.managers[pchannel]
var manager DispatcherManager
res, ok := c.managers.Load(pchannel)
if !ok {
manager = NewDispatcherManager(pchannel, c.role, c.nodeID, c.factory)
c.managers[pchannel] = manager
c.managers.Store(pchannel, manager)
go manager.Run()
} else {
manager, _ = res.(DispatcherManager)
}
ch, err := manager.Add(vchannel, pos, subPos)
if err != nil {
if manager.Num() == 0 {
manager.Close()
delete(c.managers, pchannel)
c.managers.Delete(pchannel)
}
log.Error("register failed", zap.Error(err))
return nil, err
@ -86,13 +86,12 @@ func (c *client) Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *Msg
func (c *client) Deregister(vchannel string) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
c.managerMu.Lock()
defer c.managerMu.Unlock()
if manager, ok := c.managers[pchannel]; ok {
if res, ok := c.managers.Load(pchannel); ok {
manager, _ := res.(DispatcherManager)
manager.Remove(vchannel)
if manager.Num() == 0 {
manager.Close()
delete(c.managers, pchannel)
c.managers.Delete(pchannel)
}
log.Info("deregister done", zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
@ -102,12 +101,13 @@ func (c *client) Deregister(vchannel string) {
func (c *client) Close() {
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID))
c.managerMu.Lock()
defer c.managerMu.Unlock()
for pchannel, manager := range c.managers {
c.managers.Range(func(key, value any) bool {
pchannel := key.(string)
manager := value.(DispatcherManager)
log.Info("close manager", zap.String("channel", pchannel))
delete(c.managers, pchannel)
c.managers.Delete(pchannel)
manager.Close()
}
return true
})
log.Info("dispatcher client closed")
}

View File

@ -60,5 +60,11 @@ func TestClient_Concurrency(t *testing.T) {
}
wg.Wait()
expected := int(total - deregisterCount.Load())
assert.Equal(t, expected, len(client1.(*client).managers))
var n int
client1.(*client).managers.Range(func(_, _ any) bool {
n++
return true
})
assert.Equal(t, expected, n)
}