diff --git a/build/ci/jenkins/Scale.groovy b/build/ci/jenkins/Scale.groovy index b1aef03782..dd2e667bf7 100644 --- a/build/ci/jenkins/Scale.groovy +++ b/build/ci/jenkins/Scale.groovy @@ -31,6 +31,7 @@ pipeline { TEST_TYPE = "scale-test" // SEMVER = "${BRANCH_NAME.contains('/') ? BRANCH_NAME.substring(BRANCH_NAME.lastIndexOf('/') + 1) : BRANCH_NAME}" ARTIFACTS = "${env.WORKSPACE}/_artifacts" + MILVUS_LOGS = "/tmp/milvus_logs/*" } stages { @@ -77,7 +78,16 @@ pipeline { script { dir("${env.ARTIFACTS}") { sh "tar -zcvf artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz /tmp/ci_logs --remove-files || true" - archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz ", allowEmptyArchive: true } + archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz ", allowEmptyArchive: true + DIR_LIST = sh(returnStdout: true, script: 'ls -d1 ${MILVUS_LOGS}').trim() + for (d in DIR_LIST.tokenize("\n")) { + sh "echo $d" + def release_name = d.split('/')[-1] + sh "tar -zcvf artifacts-${PROJECT_NAME}-${TEST_TYPE}-${release_name}-logs.tar.gz ${d} --remove-files || true" + archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-${release_name}-logs.tar.gz ", allowEmptyArchive: true + + } + } } } } diff --git a/tests/python_client/scale/test_data_node_scale.py b/tests/python_client/scale/test_data_node_scale.py index a5f68e7e20..a1f15450ae 100644 --- a/tests/python_client/scale/test_data_node_scale.py +++ b/tests/python_client/scale/test_data_node_scale.py @@ -10,7 +10,7 @@ from customize.milvus_operator import MilvusOperator from scale import constants from pymilvus import connections from utils.util_log import test_log as log -from utils.util_k8s import wait_pods_ready +from utils.util_k8s import wait_pods_ready, export_pod_logs from utils.util_pymilvus import get_latest_tag prefix = "data_scale" @@ -55,52 +55,61 @@ class TestDataNodeScale: host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] # host = '10.98.0.4' - # connect - connections.add_connection(default={"host": host, "port": 19530}) - connections.connect(alias='default') + try: + # connect + connections.add_connection(default={"host": host, "port": 19530}) + connections.connect(alias='default') - # create - c_name = cf.gen_unique_str("scale_query") - # c_name = 'scale_query_DymS7kI4' - collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5) + # create + c_name = cf.gen_unique_str("scale_query") + # c_name = 'scale_query_DymS7kI4' + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5) - tmp_nb = 10000 + tmp_nb = 10000 - def do_insert(): - while True: - tmp_df = cf.gen_default_dataframe_data(tmp_nb) - collection_w.insert(tmp_df) - log.debug(collection_w.num_entities) + def do_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(tmp_nb) + collection_w.insert(tmp_df) + log.debug(collection_w.num_entities) - t_insert = threading.Thread(target=do_insert, args=(), daemon=True) - t_insert.start() + t_insert = threading.Thread(target=do_insert, args=(), daemon=True) + t_insert.start() - # scale dataNode to 5 - mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE) - time.sleep(300) - log.debug("Expand dataNode test finished") + # scale dataNode to 5 + mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE) + time.sleep(300) + log.debug("Expand dataNode test finished") - # create new collection and insert - new_c_name = cf.gen_unique_str("scale_query") - collection_w_new = ApiCollectionWrapper() - collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2) + # create new collection and insert + new_c_name = cf.gen_unique_str("scale_query") + collection_w_new = ApiCollectionWrapper() + collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2) - def do_new_insert(): - while True: - tmp_df = cf.gen_default_dataframe_data(tmp_nb) - collection_w_new.insert(tmp_df) - log.debug(collection_w_new.num_entities) + def do_new_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(tmp_nb) + collection_w_new.insert(tmp_df) + log.debug(collection_w_new.num_entities) - t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True) - t_insert_new.start() + t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True) + t_insert_new.start() - # scale dataNode to 3 - mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE) - wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + # scale dataNode to 3 + mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - log.debug(collection_w.num_entities) - time.sleep(300) - log.debug("Shrink dataNode test finished") + log.debug(collection_w.num_entities) + time.sleep(300) + log.debug("Shrink dataNode test finished") - mic.uninstall(release_name, namespace=constants.NAMESPACE) + except Exception as e: + raise Exception(str(e)) + + finally: + label = f"app.kubernetes.io/instance={release_name}" + log.info('Start to export milvus pod logs') + export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) + + mic.uninstall(release_name, namespace=constants.NAMESPACE) diff --git a/tests/python_client/scale/test_index_node_scale.py b/tests/python_client/scale/test_index_node_scale.py index f0fb8ea5da..84e31893d5 100644 --- a/tests/python_client/scale/test_index_node_scale.py +++ b/tests/python_client/scale/test_index_node_scale.py @@ -10,6 +10,7 @@ from customize.milvus_operator import MilvusOperator from scale import constants from common import common_func as cf from common import common_type as ct +from utils.util_k8s import export_pod_logs from utils.util_log import test_log as log from utils.util_pymilvus import get_latest_tag @@ -97,6 +98,9 @@ class TestIndexNodeScale: raise Exception(str(e)) finally: + label = f"app.kubernetes.io/instance={release_name}" + log.info('Start to export milvus pod logs') + export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) mic.uninstall(release_name, namespace=constants.NAMESPACE) @pytest.mark.tags(CaseLabel.L3) @@ -175,4 +179,7 @@ class TestIndexNodeScale: raise Exception(str(e)) finally: + label = f"app.kubernetes.io/instance={release_name}" + log.info('Start to export milvus pod logs') + export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) mic.uninstall(release_name, namespace=constants.NAMESPACE) diff --git a/tests/python_client/scale/test_proxy_scale.py b/tests/python_client/scale/test_proxy_scale.py index 196a6ef2f6..b176b646c8 100644 --- a/tests/python_client/scale/test_proxy_scale.py +++ b/tests/python_client/scale/test_proxy_scale.py @@ -6,7 +6,7 @@ from common import common_func as cf from common.common_type import CaseLabel from scale import scale_common as sc, constants from utils.util_log import test_log as log -from utils.util_k8s import wait_pods_ready +from utils.util_k8s import wait_pods_ready, export_pod_logs from utils.util_pymilvus import get_latest_tag prefix = "proxy_scale" @@ -56,22 +56,30 @@ class TestProxyScale: host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] # host = "10.98.0.7" - c_name = cf.gen_unique_str(prefix) - self.e2e_milvus_parallel(5, host, c_name) - log.info('Milvus test before expand') + try: + c_name = cf.gen_unique_str(prefix) + self.e2e_milvus_parallel(5, host, c_name) + log.info('Milvus test before expand') - # expand proxy replicas from 1 to 5 - mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE) - wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + # expand proxy replicas from 1 to 5 + mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - self.e2e_milvus_parallel(5, host, c_name) - log.info('Milvus test after expand') + self.e2e_milvus_parallel(5, host, c_name) + log.info('Milvus test after expand') - # expand proxy replicas from 5 to 2 - mic.upgrade(release_name, {'spec.components.proxy.replicas': 2}, constants.NAMESPACE) - wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + # expand proxy replicas from 5 to 2 + mic.upgrade(release_name, {'spec.components.proxy.replicas': 2}, constants.NAMESPACE) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - self.e2e_milvus_parallel(2, host, c_name) - log.info('Milvus test after shrink') + self.e2e_milvus_parallel(2, host, c_name) + log.info('Milvus test after shrink') - mic.uninstall(release_name, namespace=constants.NAMESPACE) + except Exception as e: + raise Exception(str(e)) + + finally: + label = f"app.kubernetes.io/instance={release_name}" + log.info('Start to export milvus pod logs') + export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) + mic.uninstall(release_name, namespace=constants.NAMESPACE) diff --git a/tests/python_client/scale/test_query_node_scale.py b/tests/python_client/scale/test_query_node_scale.py index e72f66ad04..720c2eb57d 100644 --- a/tests/python_client/scale/test_query_node_scale.py +++ b/tests/python_client/scale/test_query_node_scale.py @@ -11,7 +11,7 @@ from common import common_type as ct from scale import constants from pymilvus import Index, connections from utils.util_log import test_log as log -from utils.util_k8s import wait_pods_ready +from utils.util_k8s import wait_pods_ready, export_pod_logs from utils.util_pymilvus import get_latest_tag prefix = "search_scale" @@ -55,70 +55,78 @@ class TestQueryNodeScale: host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] # host = "10.98.0.8" - # connect - connections.add_connection(default={"host": host, "port": 19530}) - connections.connect(alias='default') + try: + # connect + connections.add_connection(default={"host": host, "port": 19530}) + connections.connect(alias='default') - # create - c_name = cf.gen_unique_str("scale_query") - # c_name = 'scale_query_DymS7kI4' - collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2) + # create + c_name = cf.gen_unique_str("scale_query") + # c_name = 'scale_query_DymS7kI4' + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2) + + # insert two segments + for i in range(3): + df = cf.gen_default_dataframe_data(nb) + collection_w.insert(df) + log.debug(collection_w.num_entities) + + # create index + collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + assert collection_w.has_index()[0] + assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, + default_index_params) + + # load + collection_w.load() + + # scale queryNode to 5 + mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE) + + # continuously search + def do_search(): + while True: + search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim), + ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit) + log.debug(search_res[0].ids) + assert len(search_res[0].ids) == ct.default_limit + + t_search = threading.Thread(target=do_search, args=(), daemon=True) + t_search.start() + + # wait new QN running, continuously insert + # time.sleep(10) + healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) + log.info(f"milvus healthy after scale up: {healthy}") + # wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + + def do_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(1000) + collection_w.insert(tmp_df) + + t_insert = threading.Thread(target=do_insert, args=(), daemon=True) + t_insert.start() - # insert two segments - for i in range(3): - df = cf.gen_default_dataframe_data(nb) - collection_w.insert(df) log.debug(collection_w.num_entities) + time.sleep(20) + log.debug("Expand querynode test finished") - # create index - collection_w.create_index(ct.default_float_vec_field_name, default_index_params) - assert collection_w.has_index()[0] - assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, - default_index_params) + mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE) + time.sleep(60) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - # load - collection_w.load() + log.debug(collection_w.num_entities) + time.sleep(60) + log.debug("Shrink querynode test finished") - # scale queryNode to 5 - mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE) + except Exception as e: + raise Exception(str(e)) - # continuously search - def do_search(): - while True: - search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim), - ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - log.debug(search_res[0].ids) - assert len(search_res[0].ids) == ct.default_limit - - t_search = threading.Thread(target=do_search, args=(), daemon=True) - t_search.start() - - # wait new QN running, continuously insert - # time.sleep(10) - healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) - log.info(f"milvus healthy after scale up: {healthy}") - # wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - - def do_insert(): - while True: - tmp_df = cf.gen_default_dataframe_data(1000) - collection_w.insert(tmp_df) - - t_insert = threading.Thread(target=do_insert, args=(), daemon=True) - t_insert.start() - - log.debug(collection_w.num_entities) - time.sleep(20) - log.debug("Expand querynode test finished") - - mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE) - time.sleep(60) - wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") - - log.debug(collection_w.num_entities) - time.sleep(60) - log.debug("Shrink querynode test finished") - - mic.uninstall(release_name, namespace=constants.NAMESPACE) \ No newline at end of file + finally: + label = f"app.kubernetes.io/instance={release_name}" + log.info('Start to export milvus pod logs') + export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name) + mic.uninstall(release_name, namespace=constants.NAMESPACE) \ No newline at end of file diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index 449e7cf92e..2cb394f78b 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -82,7 +82,7 @@ def get_pod_list(namespace, label_selector): raise Exception(str(e)) -def export_pod_logs(namespace, label_selector): +def export_pod_logs(namespace, label_selector, release_name=None): """ export pod logs with label selector to '/tmp/milvus' @@ -92,10 +92,19 @@ def export_pod_logs(namespace, label_selector): :param label_selector: labels to restrict which pods logs to export :type label_selector: str + :param release_name: use the release name as server logs director name + :type label_selector: str + :example: >>> export_pod_logs("chaos-testing", "app.kubernetes.io/instance=mic-milvus") """ - pod_log_path = '/tmp/milvus_logs' + if isinstance(release_name, str): + if len(release_name.strip()) == 0: + raise ValueError("Got an unexpected space release_name") + else: + raise TypeError("Got an unexpected non-string release_name") + pod_log_path = '/tmp/milvus_logs' if release_name is None else f'/tmp/milvus_logs/{release_name}' + if not os.path.isdir(pod_log_path): os.makedirs(pod_log_path)