diff --git a/tests/python_client/chaos/chaos_objects/template/pod-failure-by-pod-list.yaml b/tests/python_client/chaos/chaos_objects/template/pod-failure-by-pod-list.yaml index 900e3ffca6..cd36a40d5d 100644 --- a/tests/python_client/chaos/chaos_objects/template/pod-failure-by-pod-list.yaml +++ b/tests/python_client/chaos/chaos_objects/template/pod-failure-by-pod-list.yaml @@ -1,17 +1,15 @@ apiVersion: chaos-mesh.org/v1alpha1 kind: PodChaos metadata: - name: test-querynode-pod-failure + name: test-datacoord-pod-failure namespace: chaos-testing spec: selector: pods: chaos-testing: - - milvus-multi-querynode-querynode-bcdc595d9-7vmcj - - milvus-multi-querynode-querynode-bcdc595d9-ccxls - - milvus-multi-querynode-querynode-bcdc595d9-dpwgp + - datacoord-standby-test-milvus-datacoord-b664b98df-c42d4 mode: all action: pod-failure - duration: 2m + duration: 3m gracePeriod: 0 \ No newline at end of file diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index c6eb5bb42c..0805d7c143 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -75,7 +75,7 @@ def exception_handler(): e_str = str(e) log_e = e_str[0:log_row_length] + \ '......' if len(e_str) > log_row_length else e_str - log.error(log_e) + log.error(f"class: {self.__class__.__name__}, func name: {func.__name__}, error: {log_e}") return Error(e), False return inner_wrapper return wrapper diff --git a/tests/python_client/chaos/cluster-values.yaml b/tests/python_client/chaos/cluster-values.yaml index d610795e5e..b56f1c47d0 100644 --- a/tests/python_client/chaos/cluster-values.yaml +++ b/tests/python_client/chaos/cluster-values.yaml @@ -6,6 +6,27 @@ image: tag: master-latest pullPolicy: IfNotPresent +rootCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +queryCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +dataCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +indexCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + + etcd: replicaCount: 3 image: diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index a62372b10e..4a56f0e9aa 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -3,6 +3,7 @@ import pytest def pytest_addoption(parser): parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type") + parser.addoption("--role_type", action="store", default="activated", help="role_type") parser.addoption("--target_component", action="store", default="querynode", help="target_component") parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration") parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval") @@ -15,6 +16,11 @@ def chaos_type(request): return request.config.getoption("--chaos_type") +@pytest.fixture +def role_type(request): + return request.config.getoption("--role_type") + + @pytest.fixture def target_component(request): return request.config.getoption("--target_component") diff --git a/tests/python_client/chaos/constants.py b/tests/python_client/chaos/constants.py index 11322d08b8..39e665a354 100644 --- a/tests/python_client/chaos/constants.py +++ b/tests/python_client/chaos/constants.py @@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chao mesh group CHAOS_VERSION = 'v1alpha1' # chao mesh version SUCC = 'succ' FAIL = 'fail' -DELTA_PER_INS = 10 # entities per insert +DELTA_PER_INS = 300 # entities per insert ENTITIES_FOR_SEARCH = 3000 # entities for search_collection CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path diff --git a/tests/python_client/chaos/test_chaos_apply_to_coord.py b/tests/python_client/chaos/test_chaos_apply_to_coord.py new file mode 100644 index 0000000000..b92363465d --- /dev/null +++ b/tests/python_client/chaos/test_chaos_apply_to_coord.py @@ -0,0 +1,139 @@ +import threading +import pytest +import time +from time import sleep +from pathlib import Path +from pymilvus import connections +from common.cus_resource_opts import CustomResourceOperations as CusResource +from common.milvus_sys import MilvusSys +import logging as log +from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool, find_activate_standby_coord_pod +from utils.util_common import update_key_value, update_key_name, gen_experiment_config +import constants + + +class TestChaosApply: + + @pytest.fixture(scope="function", autouse=True) + def init_env(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + # + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def reconnect(self): + if self.user and self.password: + connections.connect('default', host=self.host, port=self.port, + user=self.user, + password=self.password, + secure=True) + else: + connections.connect('default', host=self.host, port=self.port) + if connections.has_connection("default") is False: + raise Exception("no connections") + + def teardown(self): + chaos_res = CusResource(kind=self.chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + meta_name = self.chaos_config.get('metadata', None).get('name', None) + chaos_res.delete(meta_name, raise_ex=False) + sleep(2) + + def test_chaos_apply(self, chaos_type, role_type, target_component, chaos_duration, chaos_interval): + # start the monitor threads to check the milvus ops + log.info("*********************Chaos Test Start**********************") + log.info(connections.get_connection_addr('default')) + + activate_pod_list, standby_pod_list = find_activate_standby_coord_pod(self.milvus_ns, self.release_name, + target_component) + log.info(f"activated pod list: {activate_pod_list}, standby pod list: {standby_pod_list}") + target_pod_list = activate_pod_list + standby_pod_list + if role_type == "standby": + target_pod_list = standby_pod_list + if role_type == "activated": + target_pod_list = activate_pod_list + chaos_type = chaos_type.replace("_", "-") + chaos_config = gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/" + f"chaos_objects/template/{chaos_type}-by-pod-list.yaml") + chaos_config['metadata']['name'] = f"test-{target_component}-standby-{int(time.time())}" + + meta_name = chaos_config.get('metadata', None).get('name', None) + chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list + self.chaos_config = chaos_config + # update chaos_duration from string to int with unit second + chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0' + chaos_duration = eval(chaos_duration) + if self.deploy_by == "milvus-operator": + update_key_name(chaos_config, "component", "app.kubernetes.io/component") + self._chaos_config = chaos_config # cache the chaos config for tear down + log.info(f"chaos_config: {chaos_config}") + # apply chaos object + chaos_res = CusResource(kind=chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + chaos_res.create(chaos_config) + log.info("chaos injected") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name in chaos_list + res = chaos_res.get(meta_name) + log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}") + sleep(chaos_duration) + # delete chaos + chaos_res.delete(meta_name) + log.info("chaos deleted") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + # verify the chaos is deleted + sleep(10) + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name not in chaos_list + # wait all pods ready + t0 = time.time() + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}") + log.info("all pods are ready") + pods_ready_time = time.time() - t0 + log.info(f"pods ready time: {pods_ready_time}") + # reconnect to test the service healthy + start_time = time.time() + end_time = start_time + 120 + while time.time() < end_time: + try: + self.reconnect() + break + except Exception as e: + log.error(e) + sleep(2) + recovery_time = time.time() - start_time + log.info(f"recovery time: {recovery_time}") + time.sleep(30) + activate_pod_list_after_chaos, standby_pod_list_after_chaos = find_activate_standby_coord_pod(self.milvus_ns, self.release_name, + target_component) + log.info(f"activated pod list: {activate_pod_list_after_chaos}, standby pod list: {standby_pod_list_after_chaos}") + if role_type == "standby": + # if the standby pod is injected, the activated pod should not be changed + assert activate_pod_list_after_chaos[0] == activate_pod_list[0] + if role_type == "activated": + # if the activated pod is injected, the one of standby pods should be changed to activated + assert activate_pod_list_after_chaos[0] in standby_pod_list + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/test_chaos_multi_replicas.py b/tests/python_client/chaos/test_chaos_multi_replicas.py index 47b0399e04..12230839ea 100644 --- a/tests/python_client/chaos/test_chaos_multi_replicas.py +++ b/tests/python_client/chaos/test_chaos_multi_replicas.py @@ -214,7 +214,7 @@ class TestChaos(TestChaosBase): sleep(2) # wait all pods ready log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}") - ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE,f"app.kubernetes.io/instance={release_name}") + ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={release_name}") log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}") ready_2 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}") if ready_1 and ready_2: diff --git a/tests/python_client/chaos/test_load_with_checker.py b/tests/python_client/chaos/test_load_with_checker.py index 419a38b42b..58e8b82183 100644 --- a/tests/python_client/chaos/test_load_with_checker.py +++ b/tests/python_client/chaos/test_load_with_checker.py @@ -69,6 +69,7 @@ class TestChaos(TestChaosBase): Op.load_balance: LoadBalanceChecker() } self.health_checkers = checkers + ms = MilvusSys() self.prepare_bulk_insert() def prepare_bulk_insert(self, nb=30000, row_based=True): diff --git a/tests/python_client/chaos/testcases/test_get_collections.py b/tests/python_client/chaos/testcases/test_get_collections.py index a14958577e..d906075ffa 100644 --- a/tests/python_client/chaos/testcases/test_get_collections.py +++ b/tests/python_client/chaos/testcases/test_get_collections.py @@ -13,12 +13,13 @@ from utils.util_log import test_log as log class TestGetCollections(TestcaseBase): """ Test case of getting all collections """ - - @pytest.mark.tags(CaseLabel.L1) + + @pytest.mark.tags(CaseLabel.L3) def test_get_collections_by_prefix(self,): self._connect() all_collections = self.utility_wrap.list_collections()[0] all_collections = [c_name for c_name in all_collections if "Checker" in c_name] + log.info(f"all_collections: {all_collections}") selected_collections_map = {} for c_name in all_collections: prefix = c_name.split("_")[0] @@ -31,6 +32,7 @@ class TestGetCollections(TestcaseBase): selected_collections = [] for value in selected_collections_map.values(): selected_collections.extend(value) + assert len(selected_collections) > 0 log.info(f"find {len(selected_collections)} collections:") log.info(selected_collections) data = { diff --git a/tests/python_client/chaos/testcases/test_single_request_operation.py b/tests/python_client/chaos/testcases/test_single_request_operation.py index ee429ce473..5c2d7d26a1 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation.py @@ -67,13 +67,13 @@ class TestOperations(TestBase): cc.start_monitor_threads(self.health_checkers) log.info("*********************Load Start**********************") # wait request_duration - request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","") + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") if request_duration[-1] == "+": request_duration = request_duration[:-1] request_duration = eval(request_duration) for i in range(10): sleep(request_duration//10) - for k,v in self.health_checkers.items(): + for k, v in self.health_checkers.items(): v.check_result() if is_check: assert_statistic(self.health_checkers) diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py new file mode 100644 index 0000000000..9af9592a13 --- /dev/null +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py @@ -0,0 +1,103 @@ +import pytest +import threading +from time import sleep +from pymilvus import connections +from chaos.checker import (CreateChecker, + InsertChecker, + FlushChecker, + SearchChecker, + QueryChecker, + IndexChecker, + DeleteChecker, + Op) +from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from common.milvus_sys import MilvusSys +from chaos.chaos_commons import assert_statistic +from chaos import constants +from delayed_assert import assert_expectations +from utils.util_k8s import (get_milvus_instance_name, + get_milvus_deploy_tool, + reset_healthy_checker_after_standby_activated) + + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperations(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + log.info("connect to milvus successfully") + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.create: CreateChecker(collection_name=c_name), + Op.insert: InsertChecker(collection_name=c_name), + Op.flush: FlushChecker(collection_name=c_name), + Op.index: IndexChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + } + self.health_checkers = checkers + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, request_duration, target_component, is_check): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + c_name = None + self.init_health_checkers(collection_name=c_name) + cc.start_monitor_threads(self.health_checkers) + log.info("*********************Load Start**********************") + # wait request_duration + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") + if request_duration[-1] == "+": + request_duration = request_duration[:-1] + request_duration = eval(request_duration) + # start a thread to reset health_checkers when standby is activated. + t = threading.Thread(target=reset_healthy_checker_after_standby_activated, + args=(self.milvus_ns, self.release_name, target_component, self.health_checkers), + kwargs={"timeout": request_duration//2}, + daemon=True) + t.start() + # t.join() + log.info('start a thread to reset health_checkers when standby is activated') + for i in range(10): + sleep(request_duration//10) + for k, v in self.health_checkers.items(): + v.check_result() + if is_check: + assert_statistic(self.health_checkers) + assert_expectations() + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 996be58380..0bdeb25a73 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -41,4 +41,7 @@ minio==7.1.5 h5py==3.1.0 # for log -loguru==0.5.3 \ No newline at end of file +loguru==0.5.3 + +# for standby test +etcd3==0.12.0 \ No newline at end of file diff --git a/tests/python_client/standby/cluster-values.yaml b/tests/python_client/standby/cluster-values.yaml new file mode 100644 index 0000000000..459f4d9305 --- /dev/null +++ b/tests/python_client/standby/cluster-values.yaml @@ -0,0 +1,148 @@ +cluster: + enabled: true +image: + all: + repository: milvusdb/milvus + tag: 2.2.0-latest + pullPolicy: IfNotPresent + +rootCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +queryCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +dataCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + +indexCoordinator: + replicas: 2 + activeStandby: + enabled: true # Enable active-standby when you set multiple replicas for root coordinator + + +etcd: + replicaCount: 3 + image: + repository: milvusdb/etcd + tag: 3.5.0-r7 + +minio: + resources: + requests: + memory: 256Mi + +kafka: + enabled: false + name: kafka + replicaCount: 3 + defaultReplicationFactor: 2 + +pulsar: + enabled: true + extra: + bastion: no + wsproxy: no + + autorecovery: + resources: + requests: + cpu: 0.1 + memory: 256Mi + proxy: + replicaCount: 1 + resources: + requests: + cpu: 0.1 + memory: 256Mi + wsResources: + requests: + memory: 256Mi + cpu: 0.1 + configData: + PULSAR_MEM: > + -Xms256m -Xmx256m + PULSAR_GC: > + -XX:MaxDirectMemorySize=512m + httpNumThreads: "50" + + bookkeeper: + replicaCount: 2 + resources: + requests: + cpu: 0.1 + memory: 512Mi + configData: + PULSAR_MEM: > + -Xms512m + -Xmx512m + -XX:MaxDirectMemorySize=1024m + PULSAR_GC: > + -Dio.netty.leakDetectionLevel=disabled + -Dio.netty.recycler.linkCapacity=1024 + -XX:+UseG1GC -XX:MaxGCPauseMillis=10 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis + -XX:ParallelGCThreads=32 + -XX:ConcGCThreads=32 + -XX:G1NewSizePercent=50 + -XX:+DisableExplicitGC + -XX:-ResizePLAB + -XX:+ExitOnOutOfMemoryError + -XX:+PerfDisableSharedMem + -XX:+PrintGCDetails + nettyMaxFrameSizeBytes: "104867840" + zookeeper: + replicaCount: 1 + resources: + requests: + cpu: 0.1 + memory: 256Mi + configData: + PULSAR_MEM: > + -Xms512m + -Xmx512m + PULSAR_GC: > + -Dcom.sun.management.jmxremote + -Djute.maxbuffer=10485760 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis + -XX:+DisableExplicitGC + -XX:+PerfDisableSharedMem + -Dzookeeper.forceSync=no + broker: + replicaCount: 1 + resources: + requests: + cpu: 0.1 + memory: 512Mi + configData: + PULSAR_MEM: > + -Xms512m + -Xmx512m + -XX:MaxDirectMemorySize=1024m + PULSAR_GC: > + -Dio.netty.leakDetectionLevel=disabled + -Dio.netty.recycler.linkCapacity=1024 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis + -XX:ParallelGCThreads=32 + -XX:ConcGCThreads=32 + -XX:G1NewSizePercent=50 + -XX:+DisableExplicitGC + -XX:-ResizePLAB + -XX:+ExitOnOutOfMemoryError + maxMessageSize: "104857600" + defaultRetentionTimeInMinutes: "10080" + defaultRetentionSizeInMB: "8192" + backlogQuotaDefaultLimitGB: "8" + backlogQuotaDefaultRetentionPolicy: producer_exception diff --git a/tests/python_client/standby/scripts/install_milvus.sh b/tests/python_client/standby/scripts/install_milvus.sh new file mode 100644 index 0000000000..6e1dd51d7a --- /dev/null +++ b/tests/python_client/standby/scripts/install_milvus.sh @@ -0,0 +1,17 @@ + +release=${1:-"milvs-chaos"} +milvus_mode=${2:-"cluster"} +ns=${3:-"chaos-testing"} +bash uninstall_milvus.sh ${release} ${ns}|| true + +helm repo add milvus https://milvus-io.github.io/milvus-helm/ +helm repo update +if [[ ${milvus_mode} == "cluster" ]]; +then + helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns} +fi + +if [[ ${milvus_mode} == "standalone" ]]; +then + helm install --wait --timeout 360s ${release} milvus/milvus -f ../standalone-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns} +fi diff --git a/tests/python_client/standby/scripts/install_milvus_cluster.sh b/tests/python_client/standby/scripts/install_milvus_cluster.sh new file mode 100644 index 0000000000..5632e16107 --- /dev/null +++ b/tests/python_client/standby/scripts/install_milvus_cluster.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e + +release=${1:-"milvs-chaos"} +ns=${2:-"chaos-testing"} +bash uninstall_milvus.sh ${release} ${ns}|| true + +echo "insatll cluster" +helm install --wait --debug --timeout 600s ${RELEASE_NAME:-$release} milvus/milvus \ + --set image.all.repository=${REPOSITORY:-"milvusdb/milvus"} \ + --set image.all.tag=${IMAGE_TAG:-"master-latest"} \ + --set metrics.serviceMonitor.enabled=true \ + -f ../cluster-values.yaml -n=${ns} \ No newline at end of file diff --git a/tests/python_client/standby/scripts/install_milvus_standalone.sh b/tests/python_client/standby/scripts/install_milvus_standalone.sh new file mode 100644 index 0000000000..b07cc46973 --- /dev/null +++ b/tests/python_client/standby/scripts/install_milvus_standalone.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +release=${1:-"milvs-chaos"} +ns=${2:-"chaos-testing"} +bash uninstall_milvus.sh ${release} ${ns}|| true +echo "insatll standalone" +helm install --wait --debug --timeout 600s ${RELEASE_NAME:-$release} milvus/milvus \ + --set image.all.repository=${REPOSITORY:-"milvusdb/milvus"} \ + --set image.all.tag=${IMAGE_TAG:-"master-latest"} \ + --set metrics.serviceMonitor.enabled=true \ + -f ../standalone-values.yaml -n=${ns} \ No newline at end of file diff --git a/tests/python_client/standby/scripts/uninstall_milvus.sh b/tests/python_client/standby/scripts/uninstall_milvus.sh new file mode 100644 index 0000000000..914bdf9c5d --- /dev/null +++ b/tests/python_client/standby/scripts/uninstall_milvus.sh @@ -0,0 +1,8 @@ + +# Exit immediately for non zero status +set -e +release=${1:-"milvus-chaos"} +ns=${2:-"chaos-testing"} +helm uninstall ${release} -n=${ns} +kubectl delete pvc -l release=${release} -n=${ns} +kubectl delete pvc -l app.kubernetes.io/instance=${release} -n=${ns} diff --git a/tests/python_client/utils/util_common.py b/tests/python_client/utils/util_common.py index e2dcccb1d5..e66c047f02 100644 --- a/tests/python_client/utils/util_common.py +++ b/tests/python_client/utils/util_common.py @@ -1,5 +1,7 @@ from yaml import full_load import json +import requests +import unittest from utils.util_log import test_log as log def gen_experiment_config(yaml): @@ -24,6 +26,20 @@ def findkeys(node, kv): yield x +def find_value_by_key(node, k): + # refer to https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists + if isinstance(node, list): + for i in node: + for x in find_value_by_key(i, k): + yield x + elif isinstance(node, dict): + if k in node: + yield node[k] + for j in node.values(): + for x in find_value_by_key(j, k): + yield x + + def update_key_value(node, modify_k, modify_v): # update the value of modify_k to modify_v if isinstance(node, list): @@ -63,18 +79,128 @@ def get_collections(): return collections -if __name__ == "__main__": - d = { "id" : "abcde", - "key1" : "blah", - "key2" : "blah blah", - "nestedlist" : [ - { "id" : "qwerty", - "nestednestedlist" : [ - { "id" : "xyz", "keyA" : "blah blah blah" }, - { "id" : "fghi", "keyZ" : "blah blah blah" }], - "anothernestednestedlist" : [ - { "id" : "asdf", "keyQ" : "blah blah" }, - { "id" : "yuiop", "keyW" : "blah" }] } ] } - print(list(findkeys(d, 'id'))) - update_key_value(d, "none_id", "ccc") - print(d) +def get_request_success_rate(url, api_key, body): + headers = { + 'Authorization': "Bearer " + api_key, + 'Content-Type': 'application/json' + } + rsp = requests.post(url, headers=headers, data=json.dumps(body)) + result = {} + if rsp.status_code == 200: + results = rsp.json()["results"] + frames = results["A"]["frames"] + for frame in frames: + schema = frame["schema"] + function_name = list(find_value_by_key(schema, "function_name"))[0] + data = frame["data"]["values"] # get the success rate value + result[function_name] = data + else: + log.error(f"Failed to get request success rate with status code {rsp.status_code}") + return result + + +def analyze_service_breakdown_time(result, chaos_ts, recovery_rate): + # analyze the service breakdown time + service_breakdown_time = {} + for service, value in result.items(): + ts = value[0] + success_rate = value[1] + chaos_inject_point = 0 + failed_point = 0 + failed_ts = ts[0] + recovery_ts = ts[0] + for i, t in enumerate(ts): + if t > chaos_ts: + chaos_inject_point = i - 1 + break + if t == chaos_ts: + chaos_inject_point = i + break + previous_rate = sum(success_rate[:chaos_inject_point+1]) / (chaos_inject_point+1) + for i in range(chaos_inject_point, len(ts)): + if success_rate[i] < recovery_rate * previous_rate: + failed_point = i + failed_ts = ts[i] + break + for i in range(failed_point, len(ts)): + if success_rate[i] >= recovery_rate * previous_rate: + recovery_ts = ts[i] + break + else: + # if the service is still down, + # set the recovery time to the last timestamp with another interval + recovery_ts = ts[-1] + (ts[-1] - ts[-2]) + + breakdown_time = recovery_ts - failed_ts + log.info(f"Service {service} breakdown time is {breakdown_time}") + service_breakdown_time[service] = breakdown_time + return service_breakdown_time + + +class TestUtilCommon(unittest.TestCase): + + def test_find_value_by_key(self): + test_dict = {"id": "abcde", + "key1": "blah", + "key2": "blah blah", + "nestedlist": [ + {"id": "qwerty", + "nestednestedlist": [ + {"id": "xyz", "keyA": "blah blah blah"}, + {"id": "fghi", "keyZ": "blah blah blah"}], + "anothernestednestedlist": [ + {"id": "asdf", "keyQ": "blah blah"}, + {"id": "yuiop", "keyW": "blah"}]}]} + self.assertEqual(list(find_value_by_key(test_dict, "id")), + ['abcde', 'qwerty', 'xyz', 'fghi', 'asdf', 'yuiop']) + + def test_analyze_service_breakdown_time(self): + result = { + "service1": [[1, 2, 3, 4, 5], [1, 0, 0, 0, 1]], + "service2": [[1, 2, 3, 4, 5], [1, 1, 0, 0, 1]], + "service3": [[1, 2, 3, 4, 5], [1, 1, 1, 0, 1]], + "service4": [[1, 2, 3, 4, 5], [1, 1, 1, 1, 0]], + } + chaos_ts = 2 + recovery_rate = 0.8 + service_breakdown_time = analyze_service_breakdown_time(result, chaos_ts, recovery_rate) + self.assertEqual(service_breakdown_time, {"service1": 3, "service2": 2, "service3": 1, "service4": 1}) + + def test_get_request_success_rate(self): + url = "https://xxx/api/ds/query" + api_key = "xxx" + body = { + "queries": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "exemplar": True, + "expr": "sum(increase(milvus_proxy_req_count{app_kubernetes_io_instance=~\"datacoord-standby-test\", app_kubernetes_io_name=\"milvus\", namespace=\"chaos-testing\", status=\"success\"}[2m])/120) by(function_name, pod, node_id)", + "interval": "", + "legendFormat": "{{function_name}}-{{pod}}-{{node_id}}", + "queryType": "timeSeriesQuery", + "refId": "A", + "requestId": "123329A", + "utcOffsetSec": 28800, + "datasourceId": 1, + "intervalMs": 15000, + "maxDataPoints": 1070 + } + ], + "range": { + "from": "2023-01-06T04:24:48.549Z", + "to": "2023-01-06T07:24:48.549Z", + "raw": { + "from": "now-3h", + "to": "now" + } + }, + "from": "1672979088549", + "to": "1672989888549" + } + + rsp = get_request_success_rate(url, api_key, body) + self.assertEqual(isinstance(rsp, dict), True) + diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index 4a651a9693..a30bc66d1e 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -1,15 +1,21 @@ import json import os.path import time - +import threading import requests +import etcd3 from pymilvus import connections from kubernetes import client, config from kubernetes.client.rest import ApiException from common.milvus_sys import MilvusSys from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from utils.util_common import find_value_by_key from common.common_type import in_cluster_env +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + def init_k8s_client_config(): """ @@ -316,9 +322,79 @@ def get_metrics_querynode_sq_req_count(): raise Exception(-1, f"Failed to get metrics with status code {response.status_code}") +def get_pod_logs(namespace, pod_name): + init_k8s_client_config() + api_instance = client.CoreV1Api() + log.debug(f'Start to read {pod_name} log') + logs = api_instance.read_namespaced_pod_log(name=pod_name, namespace=namespace, async_req=True) + return logs + + +def find_activate_standby_coord_pod(namespace, release_name, coord_type): + init_k8s_client_config() + api_instance = client.CoreV1Api() + etcd_service_name = release_name + "-etcd" + service = api_instance.read_namespaced_service(name=etcd_service_name, namespace=namespace) + etcd_cluster_ip = service.spec.cluster_ip + etcd_port = service.spec.ports[0].port + etcd = etcd3.client(host=etcd_cluster_ip, port=etcd_port) + v = etcd.get(f'by-dev/meta/session/{coord_type}') + log.info(f"coord_type: {coord_type}, etcd session value: {v}") + activated_pod_ip = json.loads(v[0])["Address"].split(":")[0] + label_selector = f'app.kubernetes.io/instance={release_name}, component={coord_type}' + items = get_pod_list(namespace, label_selector=label_selector) + all_pod_list = [] + for item in items: + pod_name = item.metadata.name + all_pod_list.append(pod_name) + activate_pod_list = [] + standby_pod_list = [] + for item in items: + pod_name = item.metadata.name + ip = item.status.pod_ip + if ip == activated_pod_ip: + activate_pod_list.append(pod_name) + standby_pod_list = list(set(all_pod_list) - set(activate_pod_list)) + return activate_pod_list, standby_pod_list + + +def reset_healthy_checker_after_standby_activated(namespace, release_name, coord_type, health_checkers, timeout=360): + activate_pod_list_before, standby_pod_list_before = find_activate_standby_coord_pod(namespace, release_name, coord_type) + log.info(f"check standby switch: activate_pod_list_before {activate_pod_list_before}, " + f"standby_pod_list_before {standby_pod_list_before}") + standby_activated = False + start_time = time.time() + end_time = time.time() + while not standby_activated and end_time - start_time < timeout: + try: + activate_pod_list_after, standby_pod_list_after = find_activate_standby_coord_pod(namespace, release_name, coord_type) + if activate_pod_list_after[0] in standby_pod_list_before: + standby_activated = True + log.info(f"Standby {coord_type} pod {activate_pod_list_after[0]} activated") + log.info(f"check standby switch: activate_pod_list_after {activate_pod_list_after}, " + f"standby_pod_list_after {standby_pod_list_after}") + break + except Exception as e: + log.error(f"Exception when check standby switch: {e}") + time.sleep(10) + end_time = time.time() + if standby_activated: + time.sleep(30) + cc.reset_counting(health_checkers) + for k, v in health_checkers.items(): + log.info("reset health checkers") + v.check_result() + else: + log.info(f"Standby {coord_type} pod does not switch standby mode") + + if __name__ == '__main__': - label = "app.kubernetes.io/name=milvus, component=querynode" - instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111") - res = get_pod_list("chaos-testing", label_selector=label) - m = get_pod_ip_name_pairs("chaos-testing", label_selector=label) - export_pod_logs(namespace='chaos-testing', label_selector=label) + # for coord in ["indexcoord"]: + for coord in ["rootcoord", "datacoord", "indexcoord", "querycoord"]: + activate_pod_list, standby_pod_list = find_activate_standby_coord_pod("chaos-testing", "rootcoord-standby-test", coord) + print(f"activate pod {activate_pod_list}, standby pod {standby_pod_list}") + # label = "app.kubernetes.io/name=milvus, component=querynode" + # instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111") + # res = get_pod_list("chaos-testing", label_selector=label) + # m = get_pod_ip_name_pairs("chaos-testing", label_selector=label) + # export_pod_logs(namespace='chaos-testing', label_selector=label)