diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py new file mode 100644 index 0000000000..8f34c78ae9 --- /dev/null +++ b/tests/python_client/chaos/conftest.py @@ -0,0 +1,33 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption("--milvus_ns", action="store", default="chaos-testing", help="milvus_ns") + parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type") + parser.addoption("--target_component", action="store", default="querynode", help="target_component") + parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration") + parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval") + + +@pytest.fixture +def milvus_ns(request): + return request.config.getoption("--milvus_ns") + +@pytest.fixture +def chaos_type(request): + return request.config.getoption("--chaos_type") + + +@pytest.fixture +def target_component(request): + return request.config.getoption("--target_component") + + +@pytest.fixture +def chaos_duration(request): + return request.config.getoption("--chaos_duration") + + +@pytest.fixture +def chaos_interval(request): + return request.config.getoption("--chaos_interval") diff --git a/tests/python_client/chaos/test_chaos_apply.py b/tests/python_client/chaos/test_chaos_apply.py new file mode 100644 index 0000000000..88c55df035 --- /dev/null +++ b/tests/python_client/chaos/test_chaos_apply.py @@ -0,0 +1,113 @@ +import threading +import pytest +import time +from time import sleep +from pathlib import Path +from pymilvus import connections +from common.cus_resource_opts import CustomResourceOperations as CusResource +from common.milvus_sys import MilvusSys +import logging as log +from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool +from utils.util_common import update_key_value, update_key_name, gen_experiment_config +import constants + + +class TestChaosApply: + + @pytest.fixture(scope="function", autouse=True) + def init_env(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + # + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def reconnect(self): + if self.user and self.password: + connections.connect('default', host=self.host, port=self.port, + user=self.user, + password=self.password, + secure=True) + else: + connections.connect('default', host=self.host, port=self.port) + if connections.has_connection("default") is False: + raise Exception("no connections") + + def teardown(self): + chaos_res = CusResource(kind=self.chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + meta_name = self.chaos_config.get('metadata', None).get('name', None) + chaos_res.delete(meta_name, raise_ex=False) + sleep(2) + + def test_chaos_apply(self, chaos_type, target_component, chaos_duration, chaos_interval): + # start the monitor threads to check the milvus ops + log.info("*********************Chaos Test Start**********************") + log.info(connections.get_connection_addr('default')) + release_name = self.release_name + chaos_config = gen_experiment_config( + f"{str(Path(__file__).absolute().parent)}/chaos_objects/{chaos_type}/chaos_{target_component}_{chaos_type}.yaml") + chaos_config['metadata']['name'] = f"test-{target_component}-{chaos_type.replace('_','-')}-{int(time.time())}" + chaos_config['metadata']['namespace'] = self.chaos_ns + meta_name = chaos_config.get('metadata', None).get('name', None) + update_key_value(chaos_config, "release", release_name) + update_key_value(chaos_config, "app.kubernetes.io/instance", release_name) + update_key_value(chaos_config, "namespaces", [self.milvus_ns]) + self.chaos_config = chaos_config + if "s" in chaos_interval: + schedule = f"*/{chaos_interval[:-1]} * * * * *" + if "m" in chaos_interval: + schedule = f"00 */{chaos_interval[:-1]} * * * *" + update_key_value(chaos_config, "schedule", schedule) + # update chaos_duration from string to int with unit second + chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0' + chaos_duration = eval(chaos_duration) + if self.deploy_by == "milvus-operator": + update_key_name(chaos_config, "component", "app.kubernetes.io/component") + self._chaos_config = chaos_config # cache the chaos config for tear down + log.info(f"chaos_config: {chaos_config}") + # apply chaos object + chaos_res = CusResource(kind=chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + chaos_res.create(chaos_config) + log.info("chaos injected") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name in chaos_list + res = chaos_res.get(meta_name) + log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}") + sleep(chaos_duration) + # delete chaos + chaos_res.delete(meta_name) + log.info("chaos deleted") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + # verify the chaos is deleted + assert meta_name not in chaos_list + sleep(2) + # wait all pods ready + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}") + log.info("all pods are ready") + # reconnect to test the service healthy + sleep(20) + self.reconnect() + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/utils/util_common.py b/tests/python_client/utils/util_common.py index 2c95b38c99..2b21048106 100644 --- a/tests/python_client/utils/util_common.py +++ b/tests/python_client/utils/util_common.py @@ -1,3 +1,12 @@ +from yaml import full_load + + +def gen_experiment_config(yaml): + """load the yaml file of chaos experiment""" + with open(yaml) as f: + _config = full_load(f) + f.close() + return _config def findkeys(node, kv): @@ -27,6 +36,21 @@ def update_key_value(node, modify_k, modify_v): return node +def update_key_name(node, modify_k, modify_k_new): + # update the name of modify_k to modify_k_new + if isinstance(node, list): + for i in node: + update_key_name(i, modify_k, modify_k_new) + elif isinstance(node, dict): + if modify_k in node: + value_backup = node[modify_k] + del node[modify_k] + node[modify_k_new] = value_backup + for j in node.values(): + update_key_name(j, modify_k, modify_k_new) + return node + + if __name__ == "__main__": d = { "id" : "abcde", "key1" : "blah", diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index a173279be2..0e7021f29e 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -26,6 +26,12 @@ def init_k8s_client_config(): raise Exception(e) +def get_current_namespace(): + config.load_kube_config() + ns = config.list_kube_config_contexts()[1]["context"]["namespace"] + return ns + + def wait_pods_ready(namespace, label_selector, expected_num=None, timeout=360): """ wait pods with label selector all ready @@ -155,7 +161,7 @@ def get_querynode_id_pod_pairs(namespace, label_selector): return querynode_id_pod_pair -def get_milvus_instance_name(namespace, host, port="19530"): +def get_milvus_instance_name(namespace, host="127.0.0.1", port="19530", milvus_sys=None): """ get milvus instance name after connection @@ -172,20 +178,17 @@ def get_milvus_instance_name(namespace, host, port="19530"): "milvus-multi-querynode" """ - connections.add_connection(_default={"host": host, "port": port}) - connections.connect(alias='_default') - ms = MilvusSys() + if milvus_sys is None: + connections.add_connection(_default={"host": host, "port": port}) + connections.connect(alias='_default') + ms = MilvusSys() + else: + ms = milvus_sys query_node_ip = ms.query_nodes[0]["infos"]['hardware_infos']["ip"].split(":")[0] - pod_name = "" - if ms.deploy_mode == "STANDALONE": - # get all pods which label is app.kubernetes.io/name=milvus and component=standalone - ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus, component=standalone") - pod_name = ip_name_pairs[query_node_ip] - if ms.deploy_mode == "DISTRIBUTED": - # get all pods which label is app.kubernetes.io/name=milvus and component=querynode - ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus, component=querynode") - pod_name = ip_name_pairs[query_node_ip] - init_k8s_client_config() + ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus") + pod_name = ip_name_pairs[query_node_ip] + + config.load_kube_config() api_instance = client.CoreV1Api() try: api_response = api_instance.read_namespaced_pod(namespace=namespace, name=pod_name) @@ -196,6 +199,36 @@ def get_milvus_instance_name(namespace, host, port="19530"): return milvus_instance_name +def get_milvus_deploy_tool(namespace, milvus_sys): + """ + get milvus instance name after connection + :param namespace: the namespace where the release + :type namespace: str + :param milvus_sys: milvus_sys + :type namespace: MilvusSys + :example: + >>> deploy_tool = get_milvus_deploy_tool("chaos-testing", milvus_sys) + "helm" + """ + ms = milvus_sys + query_node_ip = ms.query_nodes[0]["infos"]['hardware_infos']["ip"].split(":")[0] + ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus") + pod_name = ip_name_pairs[query_node_ip] + config.load_kube_config() + api_instance = client.CoreV1Api() + try: + api_response = api_instance.read_namespaced_pod(namespace=namespace, name=pod_name) + except ApiException as e: + log.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e) + raise Exception(str(e)) + if ("app.kubernetes.io/managed-by" in api_response.metadata.labels and + api_response.metadata.labels["app.kubernetes.io/managed-by"] == "milvus-operator"): + deploy_tool = "milvus-operator" + else: + deploy_tool = "helm" + return deploy_tool + + def export_pod_logs(namespace, label_selector, release_name=None): """ export pod logs with label selector to '/tmp/milvus'