diff --git a/tests/python_client/milvus_client/test_milvus_client_rbac.py b/tests/python_client/milvus_client/test_milvus_client_rbac.py index 8c8d0d8cc9..cd20db3437 100644 --- a/tests/python_client/milvus_client/test_milvus_client_rbac.py +++ b/tests/python_client/milvus_client/test_milvus_client_rbac.py @@ -26,10 +26,7 @@ default_search_params = ct.default_search_params default_primary_key_field_name = "id" default_vector_field_name = "vector" default_float_field_name = ct.default_float_field_name -default_bool_field_name = ct.default_bool_field_name default_string_field_name = ct.default_string_field_name -default_int32_array_field_name = ct.default_int32_array_field_name -default_string_array_field_name = ct.default_string_array_field_name @pytest.mark.tags(CaseLabel.RBAC) @@ -573,6 +570,98 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base): ct.err_msg: "not found the role, maybe the role " "isn't existed or internal system error"}) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_milvus_client_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: test milvus client api create privilege group with invalid name + method: create privilege group with invalid name + expected: raise exception + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + self.create_role(client, role_name=role_name) + + error_msg = f"`privilege_group` value {name} is illegal" + self.create_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 1, ct.err_msg: error_msg}) + + # cleanup + self.drop_role(client, role_name=role_name) + + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_milvus_client_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port): + """ + target: test milvus client api create privilege group with invalid name + method: create privilege group with invalid name + expected: raise exception + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + self.create_role(client, role_name=role_name) + + error_msg = f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter" + self.create_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 1100, ct.err_msg: error_msg}) + + # cleanup + self.drop_role(client, role_name=role_name) + + @pytest.mark.parametrize("name", [1, 1.0]) + def test_milvus_client_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: test milvus client api drop privilege group with invalid name + method: drop privilege group with invalid name + expected: raise exception + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + self.create_role(client, role_name=role_name) + + error_msg = f"`privilege_group` value {name} is illegal" + self.drop_privilege_group(client, privilege_group=name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 1, ct.err_msg: error_msg}) + + # cleanup + self.drop_role(client, role_name=role_name) + + @pytest.mark.parametrize("name", [1, 1.0]) + def test_milvus_client_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: test milvus client api add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + self.create_role(client, role_name=role_name) + + error_msg = f"`privilege_group` value {name} is illegal" + self.add_privileges_to_group(client, privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, + check_items={ct.err_code: 1, ct.err_msg: error_msg}) + + # cleanup + self.drop_role(client, role_name=role_name) + + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_milvus_client_add_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port): + """ + target: test milvus client api add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + self.create_role(client, role_name=role_name) + + error_msg = f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter" + self.add_privileges_to_group(client, privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, + check_items={ct.err_code: 1100, ct.err_msg: error_msg}) + + # cleanup + self.drop_role(client, role_name=role_name) + @pytest.mark.tags(CaseLabel.RBAC) class TestMilvusClientRbacAdvance(TestMilvusClientV2Base): @@ -654,4 +743,642 @@ class TestMilvusClientRbacAdvance(TestMilvusClientV2Base): self.revoke_privilege(client, role_name, "Collection", "Insert", collection_name, my_db) self.drop_collection(client, collection_name) self.using_database(client, 'default') - self.drop_collection(client, collection_name) \ No newline at end of file + self.drop_collection(client, collection_name) + + def test_milvus_client_drop_role_which_bind_user(self, host, port): + """ + target: test milvus client api drop_role when role is bound to user + method: create a role, bind user to the role, drop the role + expected: drop success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + role_name = cf.gen_unique_str(role_pre) + password = cf.gen_str_by_length(contain_numbers=True) + + # create user and role + self.create_user(client, user_name=user_name, password=password) + self.create_role(client, role_name=role_name) + + # bind user to role + self.grant_role(client, user_name=user_name, role_name=role_name) + + # drop role that has user bound to it + self.drop_role(client, role_name=role_name) + + # verify role is dropped + roles, _ = self.list_roles(client) + assert role_name not in roles + + @pytest.mark.parametrize("role_name", ["admin", "public"]) + def test_milvus_client_add_user_to_default_role(self, role_name, host, port): + """ + target: test milvus client api add user to default role (admin or public) + method: create a user, add user to admin role or public role + expected: add success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + + # create user + self.create_user(client, user_name=user_name, password=password) + + # grant role to user (add user to role) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # grant role to user again to test idempotency + self.grant_role(client, user_name=user_name, role_name=role_name) + + # verify user has the role + user_info, _ = self.describe_user(client, user_name=user_name) + assert user_info["user_name"] == user_name + assert role_name in user_info.get("roles", []) + + def test_milvus_client_add_root_to_new_role(self, host, port): + """ + target: test milvus client api add root to new role + method: create a new role and add root user to the role + expected: add success + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + + # create new role + self.create_role(client, role_name=role_name) + + # add root user to the new role + self.grant_role(client, user_name=ct.default_user, role_name=role_name) + + # verify root user has the role + user_info, _ = self.describe_user(client, user_name=ct.default_user) + assert user_info["user_name"] == ct.default_user + assert role_name in user_info.get("roles", []) + + # drop the role + self.drop_role(client, role_name=role_name) + + def test_milvus_client_list_collection_grants_by_role_and_object(self, host, port): + """ + target: test milvus client api list grants by role and object + method: create a new role, grant role collection privilege, list grants by role and object + expected: list success + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + collection_name = cf.gen_unique_str(prefix) + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # create new role + self.create_role(client, role_name=role_name) + + # grant collection privileges to role + self.grant_privilege(client, role_name, "Collection", "Search", collection_name) + self.grant_privilege(client, role_name, "Collection", "Insert", collection_name) + + # wait for privilege to take effect + time.sleep(10) + + # describe role to get privileges + role_info, _ = self.describe_role(client, role_name=role_name) + + # verify grants + privileges = role_info.get("privileges", []) + assert len(privileges) == 2 + + privilege_names = [priv["privilege"] for priv in privileges] + assert "Search" in privilege_names + assert "Insert" in privilege_names + + # verify object type and name + for priv in privileges: + assert priv["object_type"] == "Collection" + assert priv["object_name"] == collection_name + + # revoke privileges + for priv in privileges: + self.revoke_privilege(client, role_name, priv["object_type"], priv["privilege"], priv["object_name"]) + + # drop role and collection + self.drop_role(client, role_name=role_name) + self.drop_collection(client, collection_name) + + def test_milvus_client_list_global_grants_by_role_and_object(self, host, port): + """ + target: test milvus client api list grants by role and object for global privileges + method: create a new role, grant role global privilege, list grants by role and object + expected: list success + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + + # create new role + self.create_role(client, role_name=role_name) + + # grant global privileges to role + self.grant_privilege(client, role_name, "Global", "CreateCollection", "*") + self.grant_privilege(client, role_name, "Global", "All", "*") + + # wait for privilege to take effect + time.sleep(10) + + # describe role to get privileges + role_info, _ = self.describe_role(client, role_name=role_name) + + # verify grants + privileges = role_info.get("privileges", []) + assert len(privileges) == 2 + + privilege_names = [priv["privilege"] for priv in privileges] + assert "CreateCollection" in privilege_names + assert "All" in privilege_names + + # verify object type and name + for priv in privileges: + assert priv["object_type"] == "Global" + assert priv["object_name"] == "*" + + # revoke privileges + for priv in privileges: + self.revoke_privilege(client, role_name, priv["object_type"], priv["privilege"], priv["object_name"]) + + # wait for revoke to take effect + time.sleep(10) + + # drop role + self.drop_role(client, role_name=role_name) + + def test_milvus_client_verify_admin_role_privilege(self, host, port): + """ + target: test milvus client api verify admin role privilege + method: create a new user, bind to admin role, crud collection + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + + # create user + self.create_user(client, user_name=user_name, password=password) + # grant admin role to user + self.grant_role(client, user_name=user_name, role_name="admin") + # wait for role to take effect + time.sleep(10) + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # test admin privileges by performing CRUD operations + # create collection + self.create_collection(user_client, collection_name, default_dim, consistency_level="Strong") + + # insert data + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(user_client, collection_name, rows) + + # create index + index_params, _ = self.prepare_index_params(user_client) + index_params.add_index(field_name=default_vector_field_name) + self.create_index(user_client, collection_name, index_params) + + # load collection + self.load_collection(user_client, collection_name) + + # verify collection has data + self.flush(user_client, collection_name) + num_entities, _ = self.get_collection_stats(user_client, collection_name) + assert num_entities['row_count'] == default_nb + + # release collection + self.release_collection(user_client, collection_name) + + # drop collection + self.drop_collection(user_client, collection_name) + + # drop user + self.drop_user(client, user_name=user_name) + + @pytest.mark.parametrize("with_db", [False, True]) + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_verify_grant_collection_load_privilege(self, host, port, with_db): + """ + target: test milvus client api verify grant collection load privilege + method: verify grant collection load privilege + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + role_name = cf.gen_unique_str(role_pre) + + # create user and role + self.create_user(client, user_name=user_name, password=password) + self.create_role(client, role_name=role_name) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # handle database parameter + if with_db: + db_name = cf.gen_unique_str(prefix) + self.create_database(client, db_name) + self.using_database(client, db_name) + else: + db_name = "default" + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # grant collection load privileges to role + self.grant_privilege(client, role_name, "Collection", "Load", collection_name, db_name=db_name) + self.grant_privilege(client, role_name, "Collection", "GetLoadingProgress", collection_name, db_name=db_name) + + # wait for privilege to take effect + time.sleep(10) + + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # test load privilege + self.using_database(user_client, db_name) + self.load_collection(user_client, collection_name) + + # cleanup + self.drop_collection(client, collection_name) + if with_db: + self.using_database(client, "default") + self.drop_database(client, db_name) + + @pytest.mark.parametrize("with_db", [False, True]) + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_verify_grant_collection_release_privilege(self, host, port, with_db): + """ + target: test milvus client api verify grant collection release privilege + method: verify grant collection release privilege + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + role_name = cf.gen_unique_str(role_pre) + + # create user and role + self.create_user(client, user_name=user_name, password=password) + self.create_role(client, role_name=role_name) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # handle database parameter + if with_db: + db_name = cf.gen_unique_str(prefix) + self.create_database(client, db_name) + self.using_database(client, db_name) + else: + db_name = "default" + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # load collection + self.load_collection(client, collection_name) + + # grant collection release privilege to role + self.grant_privilege(client, role_name, "Collection", "Release", collection_name, db_name=db_name) + + # wait for privilege to take effect + time.sleep(10) + + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # test release privilege + self.using_database(user_client, db_name) + self.release_collection(user_client, collection_name) + + # cleanup + self.drop_collection(client, collection_name) + if with_db: + self.using_database(client, "default") + self.drop_database(client, db_name) + + @pytest.mark.parametrize("with_db", [False, True]) + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_verify_grant_collection_insert_privilege(self, host, port, with_db): + """ + target: test milvus client api verify grant collection insert privilege + method: verify grant collection insert privilege + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + role_name = cf.gen_unique_str(role_pre) + + # create user and role + self.create_user(client, user_name=user_name, password=password) + self.create_role(client, role_name=role_name) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # handle database parameter + if with_db: + db_name = cf.gen_unique_str(prefix) + self.create_database(client, db_name) + self.using_database(client, db_name) + else: + db_name = "default" + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # grant collection insert privilege to role + self.grant_privilege(client, role_name, "Collection", "Insert", collection_name, db_name=db_name) + + # wait for privilege to take effect + time.sleep(10) + + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # test insert privilege + self.using_database(user_client, db_name) + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(user_client, collection_name, rows) + + # cleanup + self.drop_collection(client, collection_name) + if with_db: + self.using_database(client, "default") + self.drop_database(client, db_name) + + @pytest.mark.parametrize("with_db", [False, True]) + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_verify_grant_collection_delete_privilege(self, host, port, with_db): + """ + target: test milvus client api verify grant collection delete privilege + method: verify grant collection delete privilege + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + role_name = cf.gen_unique_str(role_pre) + + # create user and role + self.create_user(client, user_name=user_name, password=password) + self.create_role(client, role_name=role_name) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # handle database parameter + if with_db: + db_name = cf.gen_unique_str(prefix) + self.create_database(client, db_name) + self.using_database(client, db_name) + else: + db_name = "default" + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # grant collection delete privilege to role + self.grant_privilege(client, role_name, "Collection", "Delete", collection_name, db_name=db_name) + + # wait for privilege to take effect + time.sleep(10) + + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # test delete privilege + self.using_database(user_client, db_name) + delete_expr = f'{default_primary_key_field_name} == 0' + self.delete(user_client, collection_name, filter=delete_expr) + + # cleanup + self.drop_collection(client, collection_name) + if with_db: + self.using_database(client, "default") + self.drop_database(client, db_name) + + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_new_user_default_owns_public_role_permission(self, host, port): + """ + target: test milvus client api new user owns public role privilege + method: create a role, verify its permission + expected: verify success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + collection_name = cf.gen_unique_str(prefix) + collection_name_2 = cf.gen_unique_str(prefix) + role_name = cf.gen_unique_str(role_pre) + + # create user + self.create_user(client, user_name=user_name, password=password) + + # create collection + self.create_collection(client, collection_name, default_dim, consistency_level="Strong") + + # create index + index_params, _ = self.prepare_index_params(client) + index_params.add_index(field_name=default_vector_field_name) + self.create_index(client, collection_name, index_params) + + # create new client with the user credentials + uri = f"http://{host}:{port}" + user_client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password) + + # wait for user to be created + time.sleep(10) + + # test collection permission deny (new user has no privileges) + self.load_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny) + self.release_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny) + + # test insert permission deny + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(user_client, collection_name, rows, check_task=CheckTasks.check_permission_deny) + + # test delete permission deny + delete_expr = f'{default_primary_key_field_name} == 0' + self.delete(user_client, collection_name, filter=delete_expr, check_task=CheckTasks.check_permission_deny) + + # test search permission deny + vectors_to_search = rng.random((1, default_dim)) + self.search(user_client, collection_name, vectors_to_search, check_task=CheckTasks.check_permission_deny) + + # test query permission deny + query_expr = f'{default_primary_key_field_name} in [0, 1]' + self.query(user_client, collection_name, filter=query_expr, check_task=CheckTasks.check_permission_deny) + + # test global permission deny + self.create_collection(user_client, collection_name_2, default_dim, consistency_level="Strong", check_task=CheckTasks.check_permission_deny) + self.drop_collection(user_client, collection_name, check_task=CheckTasks.check_permission_deny) + self.create_user(user_client, user_name=collection_name, password=password, check_task=CheckTasks.check_permission_deny) + self.create_role(user_client, role_name=role_name, check_task=CheckTasks.check_permission_deny) + self.drop_user(user_client, user_name=user_name, check_task=CheckTasks.check_permission_deny) + + # cleanup + self.drop_collection(client, collection_name) + self.drop_user(client, user_name=user_name) + + @pytest.mark.parametrize("role_name", ["admin", "public"]) + @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/44843") + def test_milvus_client_remove_user_from_default_role(self, role_name, host, port): + """ + target: test milvus client api remove user from default role (admin or public) + method: create a user, add user to admin role or public role, remove user from role + expected: remove success + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + + # create user + self.create_user(client, user_name=user_name, password=password) + + # grant role to user (add user to role) + self.grant_role(client, user_name=user_name, role_name=role_name) + + # verify user has the role + user_info, _ = self.describe_user(client, user_name=user_name) + assert user_info["user_name"] == user_name + assert role_name in user_info.get("roles", []) + + # revoke role from user (remove user from role) + self.revoke_role(client, user_name=user_name, role_name=role_name) + + # verify user no longer has the role + user_info, _ = self.describe_user(client, user_name=user_name) + assert user_info["user_name"] == user_name + assert role_name not in user_info.get("roles", []) + + # cleanup + self.drop_user(client, user_name=user_name) + + @pytest.mark.parametrize("role_name", ["admin", "public"]) + def test_milvus_client_drop_admin_and_public_role(self, role_name, host, port): + """ + target: test milvus client api drop admin and public role fail + method: drop admin and public role fail + expected: fail to drop + """ + client = self._client() + + # verify role exists + roles, _ = self.list_roles(client) + assert role_name in roles + + # try to drop default role (should fail) + error_msg = f"the role[{role_name}] is a default role, which can't be dropped" + self.drop_role(client, role_name=role_name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 1401, ct.err_msg: error_msg}) + + # verify role still exists + roles, _ = self.list_roles(client) + assert role_name in roles + + def test_milvus_client_add_user_not_exist_role(self, host, port): + """ + target: test milvus client api add user to not exist role + method: create a user, add user to not exist role + expected: fail to add + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + role_name = cf.gen_unique_str(role_pre) + + # create user + self.create_user(client, user_name=user_name, password=password) + + # verify role does not exist + roles, _ = self.list_roles(client) + assert role_name not in roles + + # try to grant non-existent role to user (should fail) + error_msg = "not found the role, maybe the role isn't existed or internal system error" + self.grant_role(client, user_name=user_name, role_name=role_name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 65535, ct.err_msg: error_msg}) + + # cleanup + self.drop_user(client, user_name=user_name) + + def test_milvus_client_remove_user_from_empty_role(self, host, port): + """ + target: test milvus client api remove not exist user from role + method: create new role, remove not exist user from unbind role + expected: fail to remove + """ + client = self._client() + user_name = cf.gen_unique_str(user_pre) + password = cf.gen_str_by_length(contain_numbers=True) + role_name = cf.gen_unique_str(role_pre) + + # create user + self.create_user(client, user_name=user_name, password=password) + + # create role + self.create_role(client, role_name=role_name) + + # verify role does not have the user + role_info, _ = self.describe_role(client, role_name=role_name) + role_users = role_info.get("users", []) + assert user_name not in role_users + + # try to revoke user from role (should fail since user is not in the role) + error_msg = "not found the role, maybe the role isn't existed or internal system error" + self.revoke_role(client, user_name=user_name, role_name=role_name) + + # verify user is still not in the role + role_info, _ = self.describe_role(client, role_name=role_name) + role_users = role_info.get("users", []) + assert user_name not in role_users + + # cleanup + self.drop_role(client, role_name=role_name) + self.drop_user(client, user_name=user_name) + + def test_milvus_client_list_grant_by_not_exist_role(self, host, port): + """ + target: test milvus client api list grants by not exist role + method: list grants by not exist role + expected: fail to list + """ + client = self._client() + role_name = cf.gen_unique_str(role_pre) + + # verify role does not exist + roles, _ = self.list_roles(client) + assert role_name not in roles + + # try to describe non-existent role (should fail) + error_msg = "not found the role, maybe the role isn't existed or internal system error" + self.describe_role(client, role_name=role_name, check_task=CheckTasks.err_res, + check_items={ct.err_code: 65535, ct.err_msg: error_msg}) + + + + + + + + + + + + diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index bcbf528d3c..ba98a26881 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -2732,6 +2732,7 @@ class TestUtilityRBAC(TestcaseBase): self.utility_wrap.role_list_grants(**db_kwargs, check_task=CheckTasks.check_permission_deny) @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_drop_role_which_bind_user(self, host, port): """ target: drop role which bind user @@ -2753,6 +2754,7 @@ class TestUtilityRBAC(TestcaseBase): assert not self.utility_wrap.role_is_exist()[0] @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("name", ["admin", "public"]) def test_add_user_to_default_role(self, name, host, port): """ @@ -2776,6 +2778,7 @@ class TestUtilityRBAC(TestcaseBase): assert user in users @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_add_root_to_new_role(self, host, port): """ target: add root to new role @@ -2797,6 +2800,7 @@ class TestUtilityRBAC(TestcaseBase): self.utility_wrap.role_drop() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_list_collection_grands_by_role_and_object(self, host, port): """ target: list grants by role and object @@ -2827,6 +2831,7 @@ class TestUtilityRBAC(TestcaseBase): self.utility_wrap.role_drop() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_list_global_grants_by_role_and_object(self, host, port): """ target: list grants by role and object @@ -2855,6 +2860,7 @@ class TestUtilityRBAC(TestcaseBase): self.utility_wrap.role_drop() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_verify_admin_role_privilege(self, host, port): """ target: verify admin role privilege @@ -2885,6 +2891,7 @@ class TestUtilityRBAC(TestcaseBase): collection_w.drop() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("with_db", [False, True]) def test_verify_grant_collection_load_privilege(self, host, port, with_db): """ @@ -2924,6 +2931,7 @@ class TestUtilityRBAC(TestcaseBase): collection_w.load() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("with_db", [False, True]) def test_verify_grant_collection_release_privilege(self, host, port, with_db): """ @@ -2999,6 +3007,7 @@ class TestUtilityRBAC(TestcaseBase): collection_w.compact() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("with_db", [False, True]) def test_verify_grant_collection_insert_privilege(self, host, port, with_db): """ @@ -3034,6 +3043,7 @@ class TestUtilityRBAC(TestcaseBase): mutation_res, _ = collection_w.insert(data=data) @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("with_db", [False, True]) def test_verify_grant_collection_delete_privilege(self, host, port, with_db): """ @@ -3691,6 +3701,7 @@ class TestUtilityRBAC(TestcaseBase): collection_w.drop_index() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_new_user_default_owns_public_role_permission(self, host, port): """ target: new user owns public role privilege @@ -3764,6 +3775,7 @@ class TestUtilityRBAC(TestcaseBase): self.utility_wrap.has_collection(c_name) @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") @pytest.mark.parametrize("name", ["admin", "public"]) def test_remove_user_from_default_role(self, name, host, port): """ @@ -4253,6 +4265,7 @@ class TestUtilityNegativeRbac(TestcaseBase): assert self.utility_wrap.role_is_exist()[0] @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_drop_role_which_not_exist(self, host, port): """ target: drop role which not exist fail @@ -4269,6 +4282,7 @@ class TestUtilityNegativeRbac(TestcaseBase): self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_add_user_not_exist_role(self, host, port): """ target: add user to not exist role @@ -4355,6 +4369,7 @@ class TestUtilityNegativeRbac(TestcaseBase): self.utility_wrap.role_drop() @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_remove_user_from_empty_role(self, host, port): """ target: remove not exist user from role @@ -4421,6 +4436,7 @@ class TestUtilityNegativeRbac(TestcaseBase): self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip("removed") def test_list_grant_by_not_exist_role(self, host, port): """ target: list grants by not exist role @@ -5093,6 +5109,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase): @pytest.mark.tags(CaseLabel.RBAC) @pytest.mark.parametrize("name", [1, 1.0]) + @pytest.mark.skip("removed") def test_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): """ target: create privilege group with invalid name @@ -5108,6 +5125,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase): @pytest.mark.tags(CaseLabel.RBAC) @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + @pytest.mark.skip("removed") def test_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port): """ target: create privilege group with invalid name @@ -5157,6 +5175,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase): @pytest.mark.tags(CaseLabel.RBAC) @pytest.mark.parametrize("name", [1, 1.0]) + @pytest.mark.skip("removed") def test_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): """ target: drop privilege group with invalid name @@ -5291,6 +5310,7 @@ class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase): @pytest.mark.tags(CaseLabel.RBAC) @pytest.mark.parametrize("name", [1, 1.0]) + @pytest.mark.skip("removed") def test_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port): """ target: add privilege group with invalid name