diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index 9e1a557288..2e09bf4236 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -3,6 +3,7 @@ import pytest def pytest_addoption(parser): parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type") + parser.addoption("--role_type", action="store", default="activated", help="role_type") parser.addoption("--target_component", action="store", default="querynode", help="target_component") parser.addoption("--target_pod", action="store", default="etcd_leader", help="target_pod") parser.addoption("--target_number", action="store", default="1", help="target_number") @@ -17,6 +18,11 @@ def chaos_type(request): return request.config.getoption("--chaos_type") +@pytest.fixture +def role_type(request): + return request.config.getoption("--role_type") + + @pytest.fixture def target_component(request): return request.config.getoption("--target_component") diff --git a/tests/python_client/chaos/test_chaos_apply_to_coord.py b/tests/python_client/chaos/test_chaos_apply_to_coord.py new file mode 100644 index 0000000000..aabc64d8e4 --- /dev/null +++ b/tests/python_client/chaos/test_chaos_apply_to_coord.py @@ -0,0 +1,139 @@ +import threading +import pytest +import time +from time import sleep +from pathlib import Path +from pymilvus import connections +from common.cus_resource_opts import CustomResourceOperations as CusResource +from common.milvus_sys import MilvusSys +import logging as log +from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool, find_activate_standby_coord_pod +from utils.util_common import update_key_value, update_key_name, gen_experiment_config +import constants + + +class TestChaosApply: + + @pytest.fixture(scope="function", autouse=True) + def init_env(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + # + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def reconnect(self): + if self.user and self.password: + connections.connect('default', host=self.host, port=self.port, + user=self.user, + password=self.password, + secure=True) + else: + connections.connect('default', host=self.host, port=self.port) + if connections.has_connection("default") is False: + raise Exception("no connections") + + def teardown(self): + chaos_res = CusResource(kind=self.chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + meta_name = self.chaos_config.get('metadata', None).get('name', None) + chaos_res.delete(meta_name, raise_ex=False) + sleep(2) + + def test_chaos_apply(self, chaos_type, role_type, target_component, chaos_duration, chaos_interval): + # start the monitor threads to check the milvus ops + log.info("*********************Chaos Test Start**********************") + log.info(connections.get_connection_addr('default')) + + activate_pod_list, standby_pod_list = find_activate_standby_coord_pod(self.milvus_ns, self.release_name, + target_component) + log.info(f"activated pod list: {activate_pod_list}, standby pod list: {standby_pod_list}") + target_pod_list = activate_pod_list + standby_pod_list + if role_type == "standby": + target_pod_list = standby_pod_list + if role_type == "activated": + target_pod_list = activate_pod_list + chaos_type = chaos_type.replace("_", "-") + chaos_config = gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/" + f"chaos_objects/template/{chaos_type}-by-pod-list.yaml") + chaos_config['metadata']['name'] = f"test-{target_component}-standby-{int(time.time())}" + + meta_name = chaos_config.get('metadata', None).get('name', None) + chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list + self.chaos_config = chaos_config + # update chaos_duration from string to int with unit second + chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0' + chaos_duration = eval(chaos_duration) + if self.deploy_by == "milvus-operator": + update_key_name(chaos_config, "component", "app.kubernetes.io/component") + self._chaos_config = chaos_config # cache the chaos config for tear down + log.info(f"chaos_config: {chaos_config}") + # apply chaos object + chaos_res = CusResource(kind=chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + chaos_res.create(chaos_config) + log.info("chaos injected") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name in chaos_list + res = chaos_res.get(meta_name) + log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}") + sleep(chaos_duration) + # delete chaos + chaos_res.delete(meta_name) + log.info("chaos deleted") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + # verify the chaos is deleted + sleep(10) + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name not in chaos_list + # wait all pods ready + t0 = time.time() + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}") + log.info("all pods are ready") + pods_ready_time = time.time() - t0 + log.info(f"pods ready time: {pods_ready_time}") + # reconnect to test the service healthy + start_time = time.time() + end_time = start_time + 120 + while time.time() < end_time: + try: + self.reconnect() + break + except Exception as e: + log.error(e) + sleep(2) + recovery_time = time.time() - start_time + log.info(f"recovery time: {recovery_time}") + time.sleep(30) + activate_pod_list_after_chaos, standby_pod_list_after_chaos = find_activate_standby_coord_pod(self.milvus_ns, self.release_name, + target_component) + log.info(f"activated pod list: {activate_pod_list_after_chaos}, standby pod list: {standby_pod_list_after_chaos}") + if role_type == "standby": + # if the standby pod is injected, the activated pod should not be changed + assert activate_pod_list_after_chaos[0] == activate_pod_list[0] + if role_type == "activated": + # if the activated pod is injected, the one of standby pods should be changed to activated + assert activate_pod_list_after_chaos[0] in standby_pod_list + log.info("*********************Chaos Test Completed**********************") \ No newline at end of file diff --git a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py index 1e340c96d1..132671b3fb 100644 --- a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py +++ b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py @@ -1,6 +1,6 @@ import time import pytest - +from pymilvus import Collection from base.client_base import TestcaseBase from common import common_func as cf from common import common_type as ct @@ -26,11 +26,12 @@ class TestAllCollection(TestcaseBase): @pytest.mark.tags(CaseLabel.L1) def test_milvus_default(self, collection_name): + self._connect() # create name = collection_name if collection_name else cf.gen_unique_str("Checker_") t0 = time.time() - collection_w = self.init_collection_wrap(name=name, active_trace=True, enable_dynamic_field=False, - with_json=False) + schema = Collection(name=name).schema + collection_w = self.init_collection_wrap(name=name, schema=schema) tt = time.time() - t0 assert collection_w.name == name # compact collection before getting num_entities @@ -43,7 +44,11 @@ class TestAllCollection(TestcaseBase): # insert insert_batch = 3000 - data = cf.gen_default_list_data(start=-insert_batch, with_json=False) + with_json = False + for field in collection_w.schema.fields: + if field.dtype.name == "JSON": + with_json = True + data = cf.gen_default_list_data(start=-insert_batch, with_json=with_json) t0 = time.time() _, res = collection_w.insert(data) tt = time.time() - t0 @@ -89,7 +94,7 @@ class TestAllCollection(TestcaseBase): log.info(f"assert search: {tt}") assert len(res_1) == 1 # query - term_expr = f'{ct.default_int64_field_name} in {[i for i in range(-insert_batch,0)]}' + term_expr = f'{ct.default_int64_field_name} in {[i for i in range(-insert_batch, 0)]}' t0 = time.time() res, _ = collection_w.query(term_expr) tt = time.time() - t0 @@ -98,7 +103,7 @@ class TestAllCollection(TestcaseBase): collection_w.release() # insert data - d = cf.gen_default_list_data(with_json=False) + d = cf.gen_default_list_data(with_json=with_json) collection_w.insert(d) # load diff --git a/tests/python_client/chaos/testcases/test_single_request_operation.py b/tests/python_client/chaos/testcases/test_single_request_operation.py index 1aaccdc0d9..b154f10272 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation.py @@ -3,7 +3,7 @@ from time import sleep from pymilvus import connections from chaos.checker import (CreateChecker, InsertChecker, - FlushChecker, + FlushChecker, SearchChecker, QueryChecker, IndexChecker, @@ -17,6 +17,7 @@ from chaos.chaos_commons import assert_statistic from chaos import constants from delayed_assert import assert_expectations + class TestBase: expect_create = constants.SUCC expect_insert = constants.SUCC @@ -45,7 +46,7 @@ class TestOperations(TestBase): self.host = host self.port = port self.user = user - self.password = password + self.password = password def init_health_checkers(self, collection_name=None): c_name = collection_name @@ -57,7 +58,7 @@ class TestOperations(TestBase): Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), - Op.drop:DropChecker(collection_name=c_name) + Op.drop: DropChecker(collection_name=c_name) } self.health_checkers = checkers @@ -71,13 +72,13 @@ class TestOperations(TestBase): cc.start_monitor_threads(self.health_checkers) log.info("*********************Load Start**********************") # wait request_duration - request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","") + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") if request_duration[-1] == "+": request_duration = request_duration[:-1] request_duration = eval(request_duration) for i in range(10): - sleep(request_duration//10) - for k,v in self.health_checkers.items(): + sleep(request_duration // 10) + for k, v in self.health_checkers.items(): v.check_result() if is_check: assert_statistic(self.health_checkers, succ_rate_threshold=0.98) diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py new file mode 100644 index 0000000000..059b6db1bf --- /dev/null +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py @@ -0,0 +1,102 @@ +import pytest +import threading +from time import sleep +from pymilvus import connections +from chaos.checker import (CreateChecker, + InsertChecker, + FlushChecker, + SearchChecker, + QueryChecker, + IndexChecker, + DeleteChecker, + Op) +from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from common.milvus_sys import MilvusSys +from chaos.chaos_commons import assert_statistic +from chaos import constants +from delayed_assert import assert_expectations +from utils.util_k8s import (get_milvus_instance_name, + get_milvus_deploy_tool, + reset_healthy_checker_after_standby_activated) + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperations(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + log.info("connect to milvus successfully") + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.create: CreateChecker(collection_name=c_name), + Op.insert: InsertChecker(collection_name=c_name), + Op.flush: FlushChecker(collection_name=c_name), + Op.index: IndexChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + } + self.health_checkers = checkers + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, request_duration, target_component, is_check): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + c_name = None + self.init_health_checkers(collection_name=c_name) + cc.start_monitor_threads(self.health_checkers) + log.info("*********************Load Start**********************") + # wait request_duration + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") + if request_duration[-1] == "+": + request_duration = request_duration[:-1] + request_duration = eval(request_duration) + # start a thread to reset health_checkers when standby is activated. + t = threading.Thread(target=reset_healthy_checker_after_standby_activated, + args=(self.milvus_ns, self.release_name, target_component, self.health_checkers), + kwargs={"timeout": request_duration//2}, + daemon=True) + t.start() + # t.join() + log.info('start a thread to reset health_checkers when standby is activated') + for i in range(10): + sleep(request_duration//10) + for k, v in self.health_checkers.items(): + v.check_result() + if is_check: + assert_statistic(self.health_checkers) + assert_expectations() + log.info("*********************Chaos Test Completed**********************") \ No newline at end of file diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index f0ed7bc9bc..4bb8243dbb 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -42,3 +42,5 @@ loguru==0.6.0 # util psutil==5.9.4 +# for standby test +etcd-sdk-python==0.0.2 \ No newline at end of file diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index 1fe8b56462..286aee45c6 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -1,13 +1,14 @@ import json import os.path import time - +import pyetcd import requests from pymilvus import connections from kubernetes import client, config from kubernetes.client.rest import ApiException from common.milvus_sys import MilvusSys from utils.util_log import test_log as log +from chaos import chaos_commons as cc from common.common_type import in_cluster_env @@ -187,7 +188,7 @@ def get_milvus_instance_name(namespace, host="127.0.0.1", port="19530", milvus_s query_node_ip = ms.query_nodes[0]["infos"]['hardware_infos']["ip"].split(":")[0] ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus") pod_name = ip_name_pairs[query_node_ip] - + init_k8s_client_config() api_instance = client.CoreV1Api() try: @@ -222,7 +223,7 @@ def get_milvus_deploy_tool(namespace, milvus_sys): log.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e) raise Exception(str(e)) if ("app.kubernetes.io/managed-by" in api_response.metadata.labels and - api_response.metadata.labels["app.kubernetes.io/managed-by"] == "milvus-operator"): + api_response.metadata.labels["app.kubernetes.io/managed-by"] == "milvus-operator"): deploy_tool = "milvus-operator" else: deploy_tool = "helm" @@ -315,6 +316,7 @@ def get_metrics_querynode_sq_req_count(): else: raise Exception(-1, f"Failed to get metrics with status code {response.status_code}") + def get_svc_ip(namespace, label_selector): """ get svc ip from svc list """ init_k8s_client_config() @@ -393,6 +395,66 @@ def get_etcd_followers(release_name, deploy_tool="helm"): return followers +def find_activate_standby_coord_pod(namespace, release_name, coord_type): + init_k8s_client_config() + api_instance = client.CoreV1Api() + etcd_service_name = release_name + "-etcd" + service = api_instance.read_namespaced_service(name=etcd_service_name, namespace=namespace) + etcd_cluster_ip = service.spec.cluster_ip + etcd_port = service.spec.ports[0].port + etcd = pyetcd.client(host=etcd_cluster_ip, port=etcd_port) + v = etcd.get(f'by-dev/meta/session/{coord_type}') + log.info(f"coord_type: {coord_type}, etcd session value: {v}") + activated_pod_ip = json.loads(v[0])["Address"].split(":")[0] + label_selector = f'app.kubernetes.io/instance={release_name}, component={coord_type}' + items = get_pod_list(namespace, label_selector=label_selector) + all_pod_list = [] + for item in items: + pod_name = item.metadata.name + all_pod_list.append(pod_name) + activate_pod_list = [] + standby_pod_list = [] + for item in items: + pod_name = item.metadata.name + ip = item.status.pod_ip + if ip == activated_pod_ip: + activate_pod_list.append(pod_name) + standby_pod_list = list(set(all_pod_list) - set(activate_pod_list)) + return activate_pod_list, standby_pod_list + + +def reset_healthy_checker_after_standby_activated(namespace, release_name, coord_type, health_checkers, timeout=360): + activate_pod_list_before, standby_pod_list_before = find_activate_standby_coord_pod(namespace, release_name, + coord_type) + log.info(f"check standby switch: activate_pod_list_before {activate_pod_list_before}, " + f"standby_pod_list_before {standby_pod_list_before}") + standby_activated = False + start_time = time.time() + end_time = time.time() + while not standby_activated and end_time - start_time < timeout: + try: + activate_pod_list_after, standby_pod_list_after = find_activate_standby_coord_pod(namespace, release_name, + coord_type) + if activate_pod_list_after[0] in standby_pod_list_before: + standby_activated = True + log.info(f"Standby {coord_type} pod {activate_pod_list_after[0]} activated") + log.info(f"check standby switch: activate_pod_list_after {activate_pod_list_after}, " + f"standby_pod_list_after {standby_pod_list_after}") + break + except Exception as e: + log.error(f"Exception when check standby switch: {e}") + time.sleep(10) + end_time = time.time() + if standby_activated: + time.sleep(30) + cc.reset_counting(health_checkers) + for k, v in health_checkers.items(): + log.info("reset health checkers") + v.check_result() + else: + log.info(f"Standby {coord_type} pod does not switch standby mode") + + if __name__ == '__main__': label = "app.kubernetes.io/name=milvus, component=querynode" instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")