From 42e60f591e617d67ea5c1c858d6565dcd92a893f Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Tue, 21 Feb 2023 09:50:27 +0800 Subject: [PATCH] Add chaos case to test resource group (#22308) Signed-off-by: ThreadDao --- .../test_all_collections_after_chaos.py | 6 +- .../testcases/test_chaos_resource_group.py | 185 ++++++++++++++++++ .../chaos/testcases/test_data_persistence.py | 5 +- 3 files changed, 191 insertions(+), 5 deletions(-) create mode 100644 tests/python_client/chaos/testcases/test_chaos_resource_group.py diff --git a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py index 236ad95fa6..5c1bcf9101 100644 --- a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py +++ b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py @@ -8,8 +8,10 @@ from common.common_type import CaseLabel from utils.util_log import test_log as log from utils.util_common import get_collections + class TestAllCollection(TestcaseBase): """ Test case of end to end""" + @pytest.fixture(scope="function", params=get_collections()) def collection_name(self, request): if request.param == [] or request.param == "": @@ -22,7 +24,6 @@ class TestAllCollection(TestcaseBase): method.__name__) log.info("skip drop collection") - @pytest.mark.tags(CaseLabel.L1) def test_milvus_default(self, collection_name): # create @@ -67,7 +68,7 @@ class TestAllCollection(TestcaseBase): index_name=cf.gen_unique_str()) tt = time.time() - t0 log.info(f"assert index: {tt}") - + # show index infos index_infos = [index.to_dict() for index in collection_w.indexes] log.info(f"index info: {index_infos}") @@ -117,4 +118,3 @@ class TestAllCollection(TestcaseBase): tt = time.time() - t0 log.info(f"assert query result {len(res)}: {tt}") assert len(res) >= 4 - diff --git a/tests/python_client/chaos/testcases/test_chaos_resource_group.py b/tests/python_client/chaos/testcases/test_chaos_resource_group.py new file mode 100644 index 0000000000..0b2749af7e --- /dev/null +++ b/tests/python_client/chaos/testcases/test_chaos_resource_group.py @@ -0,0 +1,185 @@ +import time +import pytest + +from base.client_base import TestcaseBase +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel +from utils.util_log import test_log as log + +# customer rg +rg_name_0 = "RG_0" +rg_name_1 = "RG_1" + +# coll name +coll_name_1 = "ResourceGroup_111" +coll_name_2 = "ResourceGroup_222" + +# resource group info of 4 qns +resource_group_info = [ + {"name": rg_name_0, "available_node": 1, "capacity": 1, "loaded_replica": {coll_name_1: 1}}, + {"name": rg_name_1, "available_node": 1, "capacity": 1, "loaded_replica": {coll_name_1: 1}}, + {"name": ct.default_resource_group_name, "available_node": 2, + "capacity": ct.default_resource_group_capacity, "loaded_replica": {coll_name_2: 2}} +] + + +class TestChaosRG(TestcaseBase): + """ Test case of end to end""" + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % + method.__name__) + log.info("skip drop collection") + + @pytest.mark.tags(CaseLabel.L3) + def test_milvus_resource_group(self): + nb = 10000 + # collection rg map + collection_rg_map = { + coll_name_1: {"resource_groups": [rg_name_0, rg_name_1], "replica_number": 2}, + coll_name_2: {"resource_groups": [ct.default_resource_group_name], "replica_number": 2} + } + + self._connect() + # create RG_0, RG_1, transfer 1 node to RG_0, 1 node to RG_1 + + for rg_info in resource_group_info: + rg_name = rg_info["name"] + if rg_name != ct.default_resource_group_name: + _, create_rg_res = self.utility_wrap.create_resource_group(rg_name) + assert create_rg_res + log.info(f"[ResourceGroup] Create rg {rg_name} done") + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=rg_name, + num_node=rg_info["available_node"]) + log.info(f'[ResourceGroup] Transfer {rg_info["available_node"]} nodes from {ct.default_resource_group_name} to {rg_name} done') + + # verify RGs + resource_groups, _ = self.utility_wrap.list_resource_groups() + assert len(resource_groups) == len(resource_group_info) + assert all([rg_info["name"] in resource_groups for rg_info in resource_group_info]) + for rg_info in resource_group_info: + rg_info = {"name": rg_info["name"], + "capacity": rg_info["capacity"], + "num_available_node": rg_info["available_node"], + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + desc_rg_info, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"], + check_task=ct.CheckTasks.check_rg_property, + check_items=rg_info) + log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info}') + + # prepare collection C1, C2 + # create + data = cf.gen_default_dataframe_data(nb=nb) + index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}} + + for coll_name in coll_name_1, coll_name_2: + # create + collection_w = self.init_collection_wrap(name=coll_name, active_trace=True) + log.info(f"create collection {collection_w.name} done") + entities = collection_w.num_entities + + # insert + _, res = collection_w.insert(data) + assert res + log.info(f"insert {nb} entities done") + + # flush + _, check_result = collection_w.flush(timeout=180) + assert check_result + assert collection_w.num_entities == nb + entities + entities = collection_w.num_entities + log.info(f"flush done with entities: {entities}") + + # index + index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name, + index_params=index_params, + index_name=cf.gen_unique_str()) + index, _ = collection_w.create_index(field_name=ct.default_string_field_name, + index_params={}, + index_name=cf.gen_unique_str()) + index_infos = [index.to_dict() for index in collection_w.indexes] + log.info(f"index info: {index_infos}") + + # load coll_rg_a, 2 replicas -> RG_0, RG_1 + # load coll_rg_b, 2 replicas -> default_RG + collection_w.load(replica_number=collection_rg_map[coll_name]["replica_number"], + _resource_groups=collection_rg_map[coll_name]["resource_groups"]) + + # show query segment info + segment_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name) + log.info(f"{collection_w.name} segment info: {segment_info}") + + # show replicas info + replicas, _ = collection_w.get_replicas() + log.info(f"{collection_w.name} replica info: {replicas}") + + # search + search_vectors = cf.gen_vectors(ct.default_nq, ct.default_dim) + search_params = {"metric_type": "L2", "params": {"ef": 64}} + search_res, _ = collection_w.search(data=search_vectors, + anns_field=ct.default_float_vec_field_name, + param=search_params, limit=ct.default_limit, expr="int64 >= 0") + assert len(search_res) == ct.default_nq + assert len(search_res[0]) == ct.default_limit + + # query and delete + term_expr = f'{ct.default_int64_field_name} < 100' + query_res, _ = collection_w.query(term_expr) + assert len(query_res) == 100 + + delete_expr = f'{ct.default_int64_field_name} in {[i for i in range(100)]}' + collection_w.delete(delete_expr) + collection_w.query(term_expr, check_task=ct.CheckTasks.check_query_empty) + + # verify rg replica info + for rg_info in resource_group_info: + rg_info = {"name": rg_info["name"], + "capacity": rg_info["capacity"], + "num_available_node": rg_info["available_node"], + "num_loaded_replica": rg_info["loaded_replica"], + "num_outgoing_node": {}, + "num_incoming_node": {} + } + desc_rg_info_2, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"], + check_task=ct.CheckTasks.check_rg_property, + check_items=rg_info) + log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info_2}') + + @pytest.mark.tags(CaseLabel.L3) + def test_verify_milvus_resource_group(self): + self._connect() + + # verify collection exist + all_collections, _ = self.utility_wrap.list_collections() + assert all(coll_name in all_collections for coll_name in [coll_name_1, coll_name_2]) + + # verify resource groups + for rg_info in resource_group_info: + rg_info = {"name": rg_info["name"], + "capacity": rg_info["capacity"], + "num_available_node": rg_info["available_node"], + "num_loaded_replica": rg_info["loaded_replica"], + "num_outgoing_node": {}, + "num_incoming_node": {} + } + desc_rg_info, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"], + check_task=ct.CheckTasks.check_rg_property, + check_items=rg_info) + log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info}') + + # search + for coll_name in coll_name_1, coll_name_2: + collection_w = self.init_collection_wrap(name=coll_name, active_trace=True) + for i in range(10): + search_vectors = cf.gen_vectors(ct.default_nq, ct.default_dim) + search_params = {"metric_type": "L2", "params": {"ef": 64}} + search_res, _ = collection_w.search(data=search_vectors, + anns_field=ct.default_float_vec_field_name, + param=search_params, limit=ct.default_limit, expr="int64 >= 0") + assert len(search_res) == ct.default_nq + assert len(search_res[0]) == ct.default_limit \ No newline at end of file diff --git a/tests/python_client/chaos/testcases/test_data_persistence.py b/tests/python_client/chaos/testcases/test_data_persistence.py index 2d18f0a2db..47acfaf5ee 100644 --- a/tests/python_client/chaos/testcases/test_data_persistence.py +++ b/tests/python_client/chaos/testcases/test_data_persistence.py @@ -8,8 +8,9 @@ from common.common_type import CaseLabel from utils.util_log import test_log as log -class TestDataPersistence(TestcaseBase): +class TestDataPersistence(TestcaseBase): """ Test case of end to end""" + def teardown_method(self, method): log.info(("*" * 35) + " teardown " + ("*" * 35)) log.info("[teardown_method] Start teardown test case %s..." % @@ -100,4 +101,4 @@ class TestDataPersistence(TestcaseBase): t0 = time.time() res, _ = collection_w.query(term_expr) tt = time.time() - t0 - log.info(f"assert query result {len(res)}: {tt}") + log.info(f"assert query result {len(res)}: {tt}") \ No newline at end of file