From 226bcc16eba740422355e38e2c03b46ae72fd192 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 24 Aug 2022 10:02:52 +0800 Subject: [PATCH] Support concurrent situations for the metastore (#18792) Signed-off-by: SimFG Signed-off-by: SimFG --- internal/rootcoord/meta_table.go | 73 ++++++++++++++++++++++++++- internal/rootcoord/meta_table_test.go | 30 +++++++++++ internal/rootcoord/root_coord.go | 29 ++--------- internal/rootcoord/root_coord_test.go | 5 +- 4 files changed, 108 insertions(+), 29 deletions(-) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 0b8da050a5..746bbf32f7 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -18,6 +18,7 @@ package rootcoord import ( "context" + "errors" "fmt" "sync" @@ -77,7 +78,8 @@ type MetaTable struct { segID2IndexID map[typeutil.UniqueID]typeutil.UniqueID // segment_id -> index_id indexID2Meta map[typeutil.UniqueID]*model.Index // collection id/index_id -> meta - ddLock sync.RWMutex + ddLock sync.RWMutex + permissionLock sync.RWMutex } // NewMetaTable creates meta table for rootcoord, which stores all in-memory information @@ -1291,6 +1293,32 @@ func (mt *MetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error { if credInfo.Username == "" { return fmt.Errorf("username is empty") } + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + + usernames, err := mt.catalog.ListCredentials(mt.ctx) + if err != nil { + return err + } + if len(usernames) >= Params.ProxyCfg.MaxUserNum { + return errors.New("unable to add user because the number of users has reached the limit") + } + + if origin, _ := mt.catalog.GetCredential(mt.ctx, credInfo.Username); origin != nil { + return fmt.Errorf("user already exists: %s", credInfo.Username) + } + + credential := &model.Credential{ + Username: credInfo.Username, + EncryptedPassword: credInfo.EncryptedPassword, + } + return mt.catalog.CreateCredential(mt.ctx, credential) +} + +// UpdateCredential update credential +func (mt *MetaTable) UpdateCredential(credInfo *internalpb.CredentialInfo) error { + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() credential := &model.Credential{ Username: credInfo.Username, @@ -1307,11 +1335,17 @@ func (mt *MetaTable) getCredential(username string) (*internalpb.CredentialInfo, // DeleteCredential delete credential func (mt *MetaTable) DeleteCredential(username string) error { + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + return mt.catalog.DropCredential(mt.ctx, username) } // ListCredentialUsernames list credential usernames func (mt *MetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) { + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + usernames, err := mt.catalog.ListCredentials(mt.ctx) if err != nil { return nil, fmt.Errorf("list credential usernames err:%w", err) @@ -1353,11 +1387,26 @@ func (mt *MetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) erro if funcutil.IsEmptyString(entity.Name) { return fmt.Errorf("the role name in the role info is empty") } + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + + results, err := mt.catalog.ListRole(mt.ctx, tenant, nil, false) + if err != nil { + logger.Error("fail to list roles", zap.Error(err)) + return err + } + if len(results) >= Params.ProxyCfg.MaxRoleNum { + return errors.New("unable to add role because the number of roles has reached the limit") + } + return mt.catalog.CreateRole(mt.ctx, tenant, entity) } // DropRole drop role info func (mt *MetaTable) DropRole(tenant string, roleName string) error { + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + return mt.catalog.DropRole(mt.ctx, tenant, roleName) } @@ -1370,6 +1419,9 @@ func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEnt return fmt.Errorf("role name in the role entity is empty") } + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + return mt.catalog.AlterUserRole(mt.ctx, tenant, userEntity, roleEntity, operateType) } @@ -1377,6 +1429,9 @@ func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEnt // Enter the role condition by the entity param. And this param is nil, which means selecting all roles. // Get all users that are added to the role by setting the includeUserInfo param to true. func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) { + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + return mt.catalog.ListRole(mt.ctx, tenant, entity, includeUserInfo) } @@ -1384,6 +1439,9 @@ func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, incl // Enter the user condition by the entity param. And this param is nil, which means selecting all users. // Get all roles that are added the user to by setting the includeRoleInfo param to true. func (mt *MetaTable) SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) { + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + return mt.catalog.ListUser(mt.ctx, tenant, entity, includeRoleInfo) } @@ -1411,6 +1469,9 @@ func (mt *MetaTable) OperatePrivilege(tenant string, entity *milvuspb.GrantEntit return fmt.Errorf("the operate type in the grant entity is invalid") } + mt.permissionLock.Lock() + defer mt.permissionLock.Unlock() + return mt.catalog.AlterGrant(mt.ctx, tenant, entity, operateType) } @@ -1422,13 +1483,23 @@ func (mt *MetaTable) SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([ if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) { return entities, fmt.Errorf("the role entity in the grant entity is invalid") } + + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + return mt.catalog.ListGrant(mt.ctx, tenant, entity) } func (mt *MetaTable) ListPolicy(tenant string) ([]string, error) { + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + return mt.catalog.ListPolicy(mt.ctx, tenant) } func (mt *MetaTable) ListUserRole(tenant string) ([]string, error) { + mt.permissionLock.RLock() + defer mt.permissionLock.RUnlock() + return mt.catalog.ListUserRole(mt.ctx, tenant) } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index df3fc966ee..004ccbbfdc 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1057,6 +1057,12 @@ func TestMetaTable(t *testing.T) { wg.Add(1) t.Run("add credential failed", func(t *testing.T) { defer wg.Done() + mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) { + return []string{}, []string{}, nil + } + mockTxnKV.load = func(key string) (string, error) { + return "", errors.New("test error") + } mockTxnKV.save = func(key, value string) error { return fmt.Errorf("save error") } @@ -1090,6 +1096,9 @@ func TestRbacCreateRole(t *testing.T) { mockTxnKV.load = func(key string) (string, error) { return "", common.NewKeyNotExistError(key) } + mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) { + return []string{}, []string{}, nil + } err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"}) assert.Nil(t, err) @@ -1105,11 +1114,32 @@ func TestRbacCreateRole(t *testing.T) { err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"}) assert.Equal(t, true, common.IsIgnorableError(err)) + mockTxnKV.load = func(key string) (string, error) { + return "", common.NewKeyNotExistError(key) + } mockTxnKV.save = func(key, value string) error { return fmt.Errorf("save error") } err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"}) assert.NotNil(t, err) + + mockTxnKV.save = func(key, value string) error { + return nil + } + mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) { + return []string{}, []string{}, fmt.Errorf("loadWithPrefix error") + } + err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"}) + assert.NotNil(t, err) + + Params.ProxyCfg.MaxRoleNum = 2 + mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) { + return []string{key + "/a", key + "/b"}, []string{}, nil + } + err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"}) + assert.NotNil(t, err) + Params.ProxyCfg.MaxRoleNum = 10 + } func TestRbacDropRole(t *testing.T) { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 216c364fca..094c67de50 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2816,21 +2816,8 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username)) - usersInfo, err := c.MetaTable.ListCredentialUsernames() - if err != nil { - log.Error("CreateCredential get credential username list failed", zap.String("role", typeutil.RootCoordRole), - zap.String("username", credInfo.Username), zap.Error(err)) - return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed, fail to get credential username list to check the user number, error: "+err.Error()), nil - } - if len(usersInfo.Usernames) >= Params.ProxyCfg.MaxUserNum { - return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "unable to add user because the number of users has reached the limit"), nil - } - - if cred, _ := c.MetaTable.getCredential(credInfo.Username); cred != nil { - return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "user already exists:"+credInfo.Username), nil - } // insert to db - err = c.MetaTable.AddCredential(credInfo) + err := c.MetaTable.AddCredential(credInfo) if err != nil { log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username), zap.Error(err)) @@ -2890,7 +2877,7 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username)) // update data on storage - err := c.MetaTable.AddCredential(credInfo) + err := c.MetaTable.UpdateCredential(credInfo) if err != nil { log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username), zap.Error(err)) @@ -2985,17 +2972,7 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) ( } entity := in.Entity - results, err := c.MetaTable.SelectRole(util.DefaultTenant, nil, false) - if err != nil { - logger.Error("fail to select roles", zap.Error(err)) - return failStatus(commonpb.ErrorCode_CreateRoleFailure, "fail to select roles to check the role number, error: "+err.Error()), err - } - if len(results) >= Params.ProxyCfg.MaxRoleNum { - errMsg := "unable to add role because the number of roles has reached the limit" - return failStatus(commonpb.ErrorCode_CreateRoleFailure, errMsg), errors.New(errMsg) - } - - err = c.MetaTable.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}) + err := c.MetaTable.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}) if err != nil { logger.Error("fail to create role", zap.String("role_name", entity.Name), zap.Error(err)) return failStatus(commonpb.ErrorCode_CreateRoleFailure, "CreateCollection role failed: "+err.Error()), err diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 4e6d7dcb37..a907f79d7f 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -794,8 +794,9 @@ func TestRootCoordInitData(t *testing.T) { save: func(key, value string) error { return fmt.Errorf("save error") }, - remove: func(key string) error { return txnKV.Remove(key) }, - load: func(key string) (string, error) { return txnKV.Load(key) }, + remove: func(key string) error { return txnKV.Remove(key) }, + load: func(key string) (string, error) { return txnKV.Load(key) }, + loadWithPrefix: func(key string) ([]string, []string, error) { return txnKV.LoadWithPrefix(key) }, } //mt.txn = mockTxnKV mt.catalog = &rootcoord.Catalog{Txn: mockTxnKV, Snapshot: snapshotKV}