mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Support concurrent situations for the metastore (#18792)
Signed-off-by: SimFG <bang.fu@zilliz.com> Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
9338ad45c5
commit
226bcc16eb
@ -18,6 +18,7 @@ package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@ -77,7 +78,8 @@ type MetaTable struct {
|
||||
segID2IndexID map[typeutil.UniqueID]typeutil.UniqueID // segment_id -> index_id
|
||||
indexID2Meta map[typeutil.UniqueID]*model.Index // collection id/index_id -> meta
|
||||
|
||||
ddLock sync.RWMutex
|
||||
ddLock sync.RWMutex
|
||||
permissionLock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewMetaTable creates meta table for rootcoord, which stores all in-memory information
|
||||
@ -1291,6 +1293,32 @@ func (mt *MetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
|
||||
if credInfo.Username == "" {
|
||||
return fmt.Errorf("username is empty")
|
||||
}
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
usernames, err := mt.catalog.ListCredentials(mt.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(usernames) >= Params.ProxyCfg.MaxUserNum {
|
||||
return errors.New("unable to add user because the number of users has reached the limit")
|
||||
}
|
||||
|
||||
if origin, _ := mt.catalog.GetCredential(mt.ctx, credInfo.Username); origin != nil {
|
||||
return fmt.Errorf("user already exists: %s", credInfo.Username)
|
||||
}
|
||||
|
||||
credential := &model.Credential{
|
||||
Username: credInfo.Username,
|
||||
EncryptedPassword: credInfo.EncryptedPassword,
|
||||
}
|
||||
return mt.catalog.CreateCredential(mt.ctx, credential)
|
||||
}
|
||||
|
||||
// UpdateCredential update credential
|
||||
func (mt *MetaTable) UpdateCredential(credInfo *internalpb.CredentialInfo) error {
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
credential := &model.Credential{
|
||||
Username: credInfo.Username,
|
||||
@ -1307,11 +1335,17 @@ func (mt *MetaTable) getCredential(username string) (*internalpb.CredentialInfo,
|
||||
|
||||
// DeleteCredential delete credential
|
||||
func (mt *MetaTable) DeleteCredential(username string) error {
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
return mt.catalog.DropCredential(mt.ctx, username)
|
||||
}
|
||||
|
||||
// ListCredentialUsernames list credential usernames
|
||||
func (mt *MetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) {
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
usernames, err := mt.catalog.ListCredentials(mt.ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list credential usernames err:%w", err)
|
||||
@ -1353,11 +1387,26 @@ func (mt *MetaTable) CreateRole(tenant string, entity *milvuspb.RoleEntity) erro
|
||||
if funcutil.IsEmptyString(entity.Name) {
|
||||
return fmt.Errorf("the role name in the role info is empty")
|
||||
}
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
results, err := mt.catalog.ListRole(mt.ctx, tenant, nil, false)
|
||||
if err != nil {
|
||||
logger.Error("fail to list roles", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if len(results) >= Params.ProxyCfg.MaxRoleNum {
|
||||
return errors.New("unable to add role because the number of roles has reached the limit")
|
||||
}
|
||||
|
||||
return mt.catalog.CreateRole(mt.ctx, tenant, entity)
|
||||
}
|
||||
|
||||
// DropRole drop role info
|
||||
func (mt *MetaTable) DropRole(tenant string, roleName string) error {
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
return mt.catalog.DropRole(mt.ctx, tenant, roleName)
|
||||
}
|
||||
|
||||
@ -1370,6 +1419,9 @@ func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEnt
|
||||
return fmt.Errorf("role name in the role entity is empty")
|
||||
}
|
||||
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
return mt.catalog.AlterUserRole(mt.ctx, tenant, userEntity, roleEntity, operateType)
|
||||
}
|
||||
|
||||
@ -1377,6 +1429,9 @@ func (mt *MetaTable) OperateUserRole(tenant string, userEntity *milvuspb.UserEnt
|
||||
// Enter the role condition by the entity param. And this param is nil, which means selecting all roles.
|
||||
// Get all users that are added to the role by setting the includeUserInfo param to true.
|
||||
func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, includeUserInfo bool) ([]*milvuspb.RoleResult, error) {
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
return mt.catalog.ListRole(mt.ctx, tenant, entity, includeUserInfo)
|
||||
}
|
||||
|
||||
@ -1384,6 +1439,9 @@ func (mt *MetaTable) SelectRole(tenant string, entity *milvuspb.RoleEntity, incl
|
||||
// Enter the user condition by the entity param. And this param is nil, which means selecting all users.
|
||||
// Get all roles that are added the user to by setting the includeRoleInfo param to true.
|
||||
func (mt *MetaTable) SelectUser(tenant string, entity *milvuspb.UserEntity, includeRoleInfo bool) ([]*milvuspb.UserResult, error) {
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
return mt.catalog.ListUser(mt.ctx, tenant, entity, includeRoleInfo)
|
||||
}
|
||||
|
||||
@ -1411,6 +1469,9 @@ func (mt *MetaTable) OperatePrivilege(tenant string, entity *milvuspb.GrantEntit
|
||||
return fmt.Errorf("the operate type in the grant entity is invalid")
|
||||
}
|
||||
|
||||
mt.permissionLock.Lock()
|
||||
defer mt.permissionLock.Unlock()
|
||||
|
||||
return mt.catalog.AlterGrant(mt.ctx, tenant, entity, operateType)
|
||||
}
|
||||
|
||||
@ -1422,13 +1483,23 @@ func (mt *MetaTable) SelectGrant(tenant string, entity *milvuspb.GrantEntity) ([
|
||||
if entity.Role == nil || funcutil.IsEmptyString(entity.Role.Name) {
|
||||
return entities, fmt.Errorf("the role entity in the grant entity is invalid")
|
||||
}
|
||||
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
return mt.catalog.ListGrant(mt.ctx, tenant, entity)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) ListPolicy(tenant string) ([]string, error) {
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
return mt.catalog.ListPolicy(mt.ctx, tenant)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) ListUserRole(tenant string) ([]string, error) {
|
||||
mt.permissionLock.RLock()
|
||||
defer mt.permissionLock.RUnlock()
|
||||
|
||||
return mt.catalog.ListUserRole(mt.ctx, tenant)
|
||||
}
|
||||
|
||||
@ -1057,6 +1057,12 @@ func TestMetaTable(t *testing.T) {
|
||||
wg.Add(1)
|
||||
t.Run("add credential failed", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
return []string{}, []string{}, nil
|
||||
}
|
||||
mockTxnKV.load = func(key string) (string, error) {
|
||||
return "", errors.New("test error")
|
||||
}
|
||||
mockTxnKV.save = func(key, value string) error {
|
||||
return fmt.Errorf("save error")
|
||||
}
|
||||
@ -1090,6 +1096,9 @@ func TestRbacCreateRole(t *testing.T) {
|
||||
mockTxnKV.load = func(key string) (string, error) {
|
||||
return "", common.NewKeyNotExistError(key)
|
||||
}
|
||||
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
return []string{}, []string{}, nil
|
||||
}
|
||||
err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"})
|
||||
assert.Nil(t, err)
|
||||
|
||||
@ -1105,11 +1114,32 @@ func TestRbacCreateRole(t *testing.T) {
|
||||
err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role1"})
|
||||
assert.Equal(t, true, common.IsIgnorableError(err))
|
||||
|
||||
mockTxnKV.load = func(key string) (string, error) {
|
||||
return "", common.NewKeyNotExistError(key)
|
||||
}
|
||||
mockTxnKV.save = func(key, value string) error {
|
||||
return fmt.Errorf("save error")
|
||||
}
|
||||
err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
mockTxnKV.save = func(key, value string) error {
|
||||
return nil
|
||||
}
|
||||
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
return []string{}, []string{}, fmt.Errorf("loadWithPrefix error")
|
||||
}
|
||||
err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
Params.ProxyCfg.MaxRoleNum = 2
|
||||
mockTxnKV.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
return []string{key + "/a", key + "/b"}, []string{}, nil
|
||||
}
|
||||
err = mt.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: "role2"})
|
||||
assert.NotNil(t, err)
|
||||
Params.ProxyCfg.MaxRoleNum = 10
|
||||
|
||||
}
|
||||
|
||||
func TestRbacDropRole(t *testing.T) {
|
||||
|
||||
@ -2816,21 +2816,8 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
|
||||
log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username))
|
||||
|
||||
usersInfo, err := c.MetaTable.ListCredentialUsernames()
|
||||
if err != nil {
|
||||
log.Error("CreateCredential get credential username list failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed, fail to get credential username list to check the user number, error: "+err.Error()), nil
|
||||
}
|
||||
if len(usersInfo.Usernames) >= Params.ProxyCfg.MaxUserNum {
|
||||
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "unable to add user because the number of users has reached the limit"), nil
|
||||
}
|
||||
|
||||
if cred, _ := c.MetaTable.getCredential(credInfo.Username); cred != nil {
|
||||
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "user already exists:"+credInfo.Username), nil
|
||||
}
|
||||
// insert to db
|
||||
err = c.MetaTable.AddCredential(credInfo)
|
||||
err := c.MetaTable.AddCredential(credInfo)
|
||||
if err != nil {
|
||||
log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
@ -2890,7 +2877,7 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
|
||||
log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username))
|
||||
// update data on storage
|
||||
err := c.MetaTable.AddCredential(credInfo)
|
||||
err := c.MetaTable.UpdateCredential(credInfo)
|
||||
if err != nil {
|
||||
log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
@ -2985,17 +2972,7 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (
|
||||
}
|
||||
entity := in.Entity
|
||||
|
||||
results, err := c.MetaTable.SelectRole(util.DefaultTenant, nil, false)
|
||||
if err != nil {
|
||||
logger.Error("fail to select roles", zap.Error(err))
|
||||
return failStatus(commonpb.ErrorCode_CreateRoleFailure, "fail to select roles to check the role number, error: "+err.Error()), err
|
||||
}
|
||||
if len(results) >= Params.ProxyCfg.MaxRoleNum {
|
||||
errMsg := "unable to add role because the number of roles has reached the limit"
|
||||
return failStatus(commonpb.ErrorCode_CreateRoleFailure, errMsg), errors.New(errMsg)
|
||||
}
|
||||
|
||||
err = c.MetaTable.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name})
|
||||
err := c.MetaTable.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name})
|
||||
if err != nil {
|
||||
logger.Error("fail to create role", zap.String("role_name", entity.Name), zap.Error(err))
|
||||
return failStatus(commonpb.ErrorCode_CreateRoleFailure, "CreateCollection role failed: "+err.Error()), err
|
||||
|
||||
@ -794,8 +794,9 @@ func TestRootCoordInitData(t *testing.T) {
|
||||
save: func(key, value string) error {
|
||||
return fmt.Errorf("save error")
|
||||
},
|
||||
remove: func(key string) error { return txnKV.Remove(key) },
|
||||
load: func(key string) (string, error) { return txnKV.Load(key) },
|
||||
remove: func(key string) error { return txnKV.Remove(key) },
|
||||
load: func(key string) (string, error) { return txnKV.Load(key) },
|
||||
loadWithPrefix: func(key string) ([]string, []string, error) { return txnKV.LoadWithPrefix(key) },
|
||||
}
|
||||
//mt.txn = mockTxnKV
|
||||
mt.catalog = &rootcoord.Catalog{Txn: mockTxnKV, Snapshot: snapshotKV}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user