test: updatec rba test cases (#44863)

Signed-off-by: nico <cheng.yuan@zilliz.com>
This commit is contained in:
nico 2025-10-17 15:14:02 +08:00 committed by GitHub
parent 4bd30a74ca
commit 9f2937fd0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 751 additions and 4 deletions

View File

@ -26,10 +26,7 @@ default_search_params = ct.default_search_params
default_primary_key_field_name = "id"
default_vector_field_name = "vector"
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
default_int32_array_field_name = ct.default_int32_array_field_name
default_string_array_field_name = ct.default_string_array_field_name
@pytest.mark.tags(CaseLabel.RBAC)
@ -573,6 +570,98 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
ct.err_msg: "not found the role, maybe the role "
"isn't existed or internal system error"})
@pytest.mark.parametrize("name", [1, 1.0])
def test_milvus_client_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: test milvus client api create privilege group with invalid name
method: create privilege group with invalid name
expected: raise exception
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
error_msg = f"`privilege_group` value {name} is illegal"
self.create_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: error_msg})
# cleanup
self.drop_role(client, role_name=role_name)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_milvus_client_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port):
"""
target: test milvus client api create privilege group with invalid name
method: create privilege group with invalid name
expected: raise exception
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
error_msg = f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"
self.create_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100, ct.err_msg: error_msg})
# cleanup
self.drop_role(client, role_name=role_name)
@pytest.mark.parametrize("name", [1, 1.0])
def test_milvus_client_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: test milvus client api drop privilege group with invalid name
method: drop privilege group with invalid name
expected: raise exception
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
error_msg = f"`privilege_group` value {name} is illegal"
self.drop_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: error_msg})
# cleanup
self.drop_role(client, role_name=role_name)
@pytest.mark.parametrize("name", [1, 1.0])
def test_milvus_client_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: test milvus client api add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
error_msg = f"`privilege_group` value {name} is illegal"
self.add_privileges_to_group(client, privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: error_msg})
# cleanup
self.drop_role(client, role_name=role_name)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_milvus_client_add_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port):
"""
target: test milvus client api add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
error_msg = f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"
self.add_privileges_to_group(client, privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100, ct.err_msg: error_msg})
# cleanup
self.drop_role(client, role_name=role_name)
@pytest.mark.tags(CaseLabel.RBAC)
class TestMilvusClientRbacAdvance(TestMilvusClientV2Base):
@ -654,4 +743,642 @@ class TestMilvusClientRbacAdvance(TestMilvusClientV2Base):
self.revoke_privilege(client, role_name, "Collection", "Insert", collection_name, my_db)
self.drop_collection(client, collection_name)
self.using_database(client, 'default')
self.drop_collection(client, collection_name)
self.drop_collection(client, collection_name)
def test_milvus_client_drop_role_which_bind_user(self, host, port):
"""
target: test milvus client api drop_role when role is bound to user
method: create a role, bind user to the role, drop the role
expected: drop success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length(contain_numbers=True)
# create user and role
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
# bind user to role
self.grant_role(client, user_name=user_name, role_name=role_name)
# drop role that has user bound to it
self.drop_role(client, role_name=role_name)
# verify role is dropped
roles, _ = self.list_roles(client)
assert role_name not in roles
@pytest.mark.parametrize("role_name", ["admin", "public"])
def test_milvus_client_add_user_to_default_role(self, role_name, host, port):
"""
target: test milvus client api add user to default role (admin or public)
method: create a user, add user to admin role or public role
expected: add success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
# create user
self.create_user(client, user_name=user_name, password=password)
# grant role to user (add user to role)
self.grant_role(client, user_name=user_name, role_name=role_name)
# grant role to user again to test idempotency
self.grant_role(client, user_name=user_name, role_name=role_name)
# verify user has the role
user_info, _ = self.describe_user(client, user_name=user_name)
assert user_info["user_name"] == user_name
assert role_name in user_info.get("roles", [])
def test_milvus_client_add_root_to_new_role(self, host, port):
"""
target: test milvus client api add root to new role
method: create a new role and add root user to the role
expected: add success
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
# create new role
self.create_role(client, role_name=role_name)
# add root user to the new role
self.grant_role(client, user_name=ct.default_user, role_name=role_name)
# verify root user has the role
user_info, _ = self.describe_user(client, user_name=ct.default_user)
assert user_info["user_name"] == ct.default_user
assert role_name in user_info.get("roles", [])
# drop the role
self.drop_role(client, role_name=role_name)
def test_milvus_client_list_collection_grants_by_role_and_object(self, host, port):
"""
target: test milvus client api list grants by role and object
method: create a new role, grant role collection privilege, list grants by role and object
expected: list success
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
collection_name = cf.gen_unique_str(prefix)
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# create new role
self.create_role(client, role_name=role_name)
# grant collection privileges to role
self.grant_privilege(client, role_name, "Collection", "Search", collection_name)
self.grant_privilege(client, role_name, "Collection", "Insert", collection_name)
# wait for privilege to take effect
time.sleep(10)
# describe role to get privileges
role_info, _ = self.describe_role(client, role_name=role_name)
# verify grants
privileges = role_info.get("privileges", [])
assert len(privileges) == 2
privilege_names = [priv["privilege"] for priv in privileges]
assert "Search" in privilege_names
assert "Insert" in privilege_names
# verify object type and name
for priv in privileges:
assert priv["object_type"] == "Collection"
assert priv["object_name"] == collection_name
# revoke privileges
for priv in privileges:
self.revoke_privilege(client, role_name, priv["object_type"], priv["privilege"], priv["object_name"])
# drop role and collection
self.drop_role(client, role_name=role_name)
self.drop_collection(client, collection_name)
def test_milvus_client_list_global_grants_by_role_and_object(self, host, port):
"""
target: test milvus client api list grants by role and object for global privileges
method: create a new role, grant role global privilege, list grants by role and object
expected: list success
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
# create new role
self.create_role(client, role_name=role_name)
# grant global privileges to role
self.grant_privilege(client, role_name, "Global", "CreateCollection", "*")
self.grant_privilege(client, role_name, "Global", "All", "*")
# wait for privilege to take effect
time.sleep(10)
# describe role to get privileges
role_info, _ = self.describe_role(client, role_name=role_name)
# verify grants
privileges = role_info.get("privileges", [])
assert len(privileges) == 2
privilege_names = [priv["privilege"] for priv in privileges]
assert "CreateCollection" in privilege_names
assert "All" in privilege_names
# verify object type and name
for priv in privileges:
assert priv["object_type"] == "Global"
assert priv["object_name"] == "*"
# revoke privileges
for priv in privileges:
self.revoke_privilege(client, role_name, priv["object_type"], priv["privilege"], priv["object_name"])
# wait for revoke to take effect
time.sleep(10)
# drop role
self.drop_role(client, role_name=role_name)
def test_milvus_client_verify_admin_role_privilege(self, host, port):
"""
target: test milvus client api verify admin role privilege
method: create a new user, bind to admin role, crud collection
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
# create user
self.create_user(client, user_name=user_name, password=password)
# grant admin role to user
self.grant_role(client, user_name=user_name, role_name="admin")
# wait for role to take effect
time.sleep(10)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# test admin privileges by performing CRUD operations
# create collection
self.create_collection(user_client, collection_name, default_dim, consistency_level="Strong")
# insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(user_client, collection_name, rows)
# create index
index_params, _ = self.prepare_index_params(user_client)
index_params.add_index(field_name=default_vector_field_name)
self.create_index(user_client, collection_name, index_params)
# load collection
self.load_collection(user_client, collection_name)
# verify collection has data
self.flush(user_client, collection_name)
num_entities, _ = self.get_collection_stats(user_client, collection_name)
assert num_entities['row_count'] == default_nb
# release collection
self.release_collection(user_client, collection_name)
# drop collection
self.drop_collection(user_client, collection_name)
# drop user
self.drop_user(client, user_name=user_name)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_verify_grant_collection_load_privilege(self, host, port, with_db):
"""
target: test milvus client api verify grant collection load privilege
method: verify grant collection load privilege
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
role_name = cf.gen_unique_str(role_pre)
# create user and role
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
self.grant_role(client, user_name=user_name, role_name=role_name)
# handle database parameter
if with_db:
db_name = cf.gen_unique_str(prefix)
self.create_database(client, db_name)
self.using_database(client, db_name)
else:
db_name = "default"
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# grant collection load privileges to role
self.grant_privilege(client, role_name, "Collection", "Load", collection_name, db_name=db_name)
self.grant_privilege(client, role_name, "Collection", "GetLoadingProgress", collection_name, db_name=db_name)
# wait for privilege to take effect
time.sleep(10)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# test load privilege
self.using_database(user_client, db_name)
self.load_collection(user_client, collection_name)
# cleanup
self.drop_collection(client, collection_name)
if with_db:
self.using_database(client, "default")
self.drop_database(client, db_name)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_verify_grant_collection_release_privilege(self, host, port, with_db):
"""
target: test milvus client api verify grant collection release privilege
method: verify grant collection release privilege
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
role_name = cf.gen_unique_str(role_pre)
# create user and role
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
self.grant_role(client, user_name=user_name, role_name=role_name)
# handle database parameter
if with_db:
db_name = cf.gen_unique_str(prefix)
self.create_database(client, db_name)
self.using_database(client, db_name)
else:
db_name = "default"
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# load collection
self.load_collection(client, collection_name)
# grant collection release privilege to role
self.grant_privilege(client, role_name, "Collection", "Release", collection_name, db_name=db_name)
# wait for privilege to take effect
time.sleep(10)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# test release privilege
self.using_database(user_client, db_name)
self.release_collection(user_client, collection_name)
# cleanup
self.drop_collection(client, collection_name)
if with_db:
self.using_database(client, "default")
self.drop_database(client, db_name)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_verify_grant_collection_insert_privilege(self, host, port, with_db):
"""
target: test milvus client api verify grant collection insert privilege
method: verify grant collection insert privilege
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
role_name = cf.gen_unique_str(role_pre)
# create user and role
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
self.grant_role(client, user_name=user_name, role_name=role_name)
# handle database parameter
if with_db:
db_name = cf.gen_unique_str(prefix)
self.create_database(client, db_name)
self.using_database(client, db_name)
else:
db_name = "default"
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# grant collection insert privilege to role
self.grant_privilege(client, role_name, "Collection", "Insert", collection_name, db_name=db_name)
# wait for privilege to take effect
time.sleep(10)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# test insert privilege
self.using_database(user_client, db_name)
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(user_client, collection_name, rows)
# cleanup
self.drop_collection(client, collection_name)
if with_db:
self.using_database(client, "default")
self.drop_database(client, db_name)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_verify_grant_collection_delete_privilege(self, host, port, with_db):
"""
target: test milvus client api verify grant collection delete privilege
method: verify grant collection delete privilege
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
role_name = cf.gen_unique_str(role_pre)
# create user and role
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
self.grant_role(client, user_name=user_name, role_name=role_name)
# handle database parameter
if with_db:
db_name = cf.gen_unique_str(prefix)
self.create_database(client, db_name)
self.using_database(client, db_name)
else:
db_name = "default"
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# grant collection delete privilege to role
self.grant_privilege(client, role_name, "Collection", "Delete", collection_name, db_name=db_name)
# wait for privilege to take effect
time.sleep(10)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# test delete privilege
self.using_database(user_client, db_name)
delete_expr = f'{default_primary_key_field_name} == 0'
self.delete(user_client, collection_name, filter=delete_expr)
# cleanup
self.drop_collection(client, collection_name)
if with_db:
self.using_database(client, "default")
self.drop_database(client, db_name)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_new_user_default_owns_public_role_permission(self, host, port):
"""
target: test milvus client api new user owns public role privilege
method: create a role, verify its permission
expected: verify success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
collection_name = cf.gen_unique_str(prefix)
collection_name_2 = cf.gen_unique_str(prefix)
role_name = cf.gen_unique_str(role_pre)
# create user
self.create_user(client, user_name=user_name, password=password)
# create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# create index
index_params, _ = self.prepare_index_params(client)
index_params.add_index(field_name=default_vector_field_name)
self.create_index(client, collection_name, index_params)
# create new client with the user credentials
uri = f"http://{host}:{port}"
user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
# wait for user to be created
time.sleep(10)
# test collection permission deny (new user has no privileges)
self.load_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny)
self.release_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny)
# test insert permission deny
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(user_client, collection_name, rows, check_task=CheckTasks.check_permission_deny)
# test delete permission deny
delete_expr = f'{default_primary_key_field_name} == 0'
self.delete(user_client, collection_name, filter=delete_expr, check_task=CheckTasks.check_permission_deny)
# test search permission deny
vectors_to_search = rng.random((1, default_dim))
self.search(user_client, collection_name, vectors_to_search, check_task=CheckTasks.check_permission_deny)
# test query permission deny
query_expr = f'{default_primary_key_field_name} in [0, 1]'
self.query(user_client, collection_name, filter=query_expr, check_task=CheckTasks.check_permission_deny)
# test global permission deny
self.create_collection(user_client, collection_name_2, default_dim, consistency_level="Strong", check_task=CheckTasks.check_permission_deny)
self.drop_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny)
self.create_user(user_client, user_name=collection_name, password=password, check_task=CheckTasks.check_permission_deny)
self.create_role(user_client, role_name=role_name, check_task=CheckTasks.check_permission_deny)
self.drop_user(user_client, user_name=user_name, check_task=CheckTasks.check_permission_deny)
# cleanup
self.drop_collection(client, collection_name)
self.drop_user(client, user_name=user_name)
@pytest.mark.parametrize("role_name", ["admin", "public"])
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843")
def test_milvus_client_remove_user_from_default_role(self, role_name, host, port):
"""
target: test milvus client api remove user from default role (admin or public)
method: create a user, add user to admin role or public role, remove user from role
expected: remove success
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
# create user
self.create_user(client, user_name=user_name, password=password)
# grant role to user (add user to role)
self.grant_role(client, user_name=user_name, role_name=role_name)
# verify user has the role
user_info, _ = self.describe_user(client, user_name=user_name)
assert user_info["user_name"] == user_name
assert role_name in user_info.get("roles", [])
# revoke role from user (remove user from role)
self.revoke_role(client, user_name=user_name, role_name=role_name)
# verify user no longer has the role
user_info, _ = self.describe_user(client, user_name=user_name)
assert user_info["user_name"] == user_name
assert role_name not in user_info.get("roles", [])
# cleanup
self.drop_user(client, user_name=user_name)
@pytest.mark.parametrize("role_name", ["admin", "public"])
def test_milvus_client_drop_admin_and_public_role(self, role_name, host, port):
"""
target: test milvus client api drop admin and public role fail
method: drop admin and public role fail
expected: fail to drop
"""
client = self._client()
# verify role exists
roles, _ = self.list_roles(client)
assert role_name in roles
# try to drop default role (should fail)
error_msg = f"the role[{role_name}] is a default role, which can't be dropped"
self.drop_role(client, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1401, ct.err_msg: error_msg})
# verify role still exists
roles, _ = self.list_roles(client)
assert role_name in roles
def test_milvus_client_add_user_not_exist_role(self, host, port):
"""
target: test milvus client api add user to not exist role
method: create a user, add user to not exist role
expected: fail to add
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
role_name = cf.gen_unique_str(role_pre)
# create user
self.create_user(client, user_name=user_name, password=password)
# verify role does not exist
roles, _ = self.list_roles(client)
assert role_name not in roles
# try to grant non-existent role to user (should fail)
error_msg = "not found the role, maybe the role isn't existed or internal system error"
self.grant_role(client, user_name=user_name, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, ct.err_msg: error_msg})
# cleanup
self.drop_user(client, user_name=user_name)
def test_milvus_client_remove_user_from_empty_role(self, host, port):
"""
target: test milvus client api remove not exist user from role
method: create new role, remove not exist user from unbind role
expected: fail to remove
"""
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length(contain_numbers=True)
role_name = cf.gen_unique_str(role_pre)
# create user
self.create_user(client, user_name=user_name, password=password)
# create role
self.create_role(client, role_name=role_name)
# verify role does not have the user
role_info, _ = self.describe_role(client, role_name=role_name)
role_users = role_info.get("users", [])
assert user_name not in role_users
# try to revoke user from role (should fail since user is not in the role)
error_msg = "not found the role, maybe the role isn't existed or internal system error"
self.revoke_role(client, user_name=user_name, role_name=role_name)
# verify user is still not in the role
role_info, _ = self.describe_role(client, role_name=role_name)
role_users = role_info.get("users", [])
assert user_name not in role_users
# cleanup
self.drop_role(client, role_name=role_name)
self.drop_user(client, user_name=user_name)
def test_milvus_client_list_grant_by_not_exist_role(self, host, port):
"""
target: test milvus client api list grants by not exist role
method: list grants by not exist role
expected: fail to list
"""
client = self._client()
role_name = cf.gen_unique_str(role_pre)
# verify role does not exist
roles, _ = self.list_roles(client)
assert role_name not in roles
# try to describe non-existent role (should fail)
error_msg = "not found the role, maybe the role isn't existed or internal system error"
self.describe_role(client, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, ct.err_msg: error_msg})

View File

@ -2732,6 +2732,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_list_grants(**db_kwargs, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_drop_role_which_bind_user(self, host, port):
"""
target: drop role which bind user
@ -2753,6 +2754,7 @@ class TestUtilityRBAC(TestcaseBase):
assert not self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("name", ["admin", "public"])
def test_add_user_to_default_role(self, name, host, port):
"""
@ -2776,6 +2778,7 @@ class TestUtilityRBAC(TestcaseBase):
assert user in users
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_add_root_to_new_role(self, host, port):
"""
target: add root to new role
@ -2797,6 +2800,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_list_collection_grands_by_role_and_object(self, host, port):
"""
target: list grants by role and object
@ -2827,6 +2831,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_list_global_grants_by_role_and_object(self, host, port):
"""
target: list grants by role and object
@ -2855,6 +2860,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_verify_admin_role_privilege(self, host, port):
"""
target: verify admin role privilege
@ -2885,6 +2891,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_load_privilege(self, host, port, with_db):
"""
@ -2924,6 +2931,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.load()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_release_privilege(self, host, port, with_db):
"""
@ -2999,6 +3007,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.compact()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_insert_privilege(self, host, port, with_db):
"""
@ -3034,6 +3043,7 @@ class TestUtilityRBAC(TestcaseBase):
mutation_res, _ = collection_w.insert(data=data)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_delete_privilege(self, host, port, with_db):
"""
@ -3691,6 +3701,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.drop_index()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_new_user_default_owns_public_role_permission(self, host, port):
"""
target: new user owns public role privilege
@ -3764,6 +3775,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.has_collection(c_name)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
@pytest.mark.parametrize("name", ["admin", "public"])
def test_remove_user_from_default_role(self, name, host, port):
"""
@ -4253,6 +4265,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
assert self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_drop_role_which_not_exist(self, host, port):
"""
target: drop role which not exist fail
@ -4269,6 +4282,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_add_user_not_exist_role(self, host, port):
"""
target: add user to not exist role
@ -4355,6 +4369,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_remove_user_from_empty_role(self, host, port):
"""
target: remove not exist user from role
@ -4421,6 +4436,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("removed")
def test_list_grant_by_not_exist_role(self, host, port):
"""
target: list grants by not exist role
@ -5093,6 +5109,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase):
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
@pytest.mark.skip("removed")
def test_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: create privilege group with invalid name
@ -5108,6 +5125,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase):
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
@pytest.mark.skip("removed")
def test_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port):
"""
target: create privilege group with invalid name
@ -5157,6 +5175,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase):
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
@pytest.mark.skip("removed")
def test_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: drop privilege group with invalid name
@ -5291,6 +5310,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase):
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
@pytest.mark.skip("removed")
def test_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: add privilege group with invalid name