diff --git a/tests/python_client/scale/constants.py b/tests/python_client/scale/constants.py index 0cdbee8e2f..76c58068ae 100644 --- a/tests/python_client/scale/constants.py +++ b/tests/python_client/scale/constants.py @@ -1,6 +1,6 @@ # scale object -IMAGE_REPOSITORY = "milvusdb/milvus-dev" # repository of milvus image -IMAGE_TAG = "master-latest" # tag of milvus image +IMAGE_REPOSITORY = "harbor.zilliz.cc/milvus/milvus" # repository of milvus image +IMAGE_TAG = "master-20211207-4cd314d" # tag of milvus image NAMESPACE = "chaos-testing" # namespace IF_NOT_PRESENT = "IfNotPresent" # image pullPolicy IfNotPresent ALWAYS = "Always" # image pullPolicy Always diff --git a/tests/python_client/scale/test_data_node_scale.py b/tests/python_client/scale/test_data_node_scale.py index 121a3a8259..bdb23c6108 100644 --- a/tests/python_client/scale/test_data_node_scale.py +++ b/tests/python_client/scale/test_data_node_scale.py @@ -1,12 +1,16 @@ +import threading +import time + import pytest from base.collection_wrapper import ApiCollectionWrapper from common.common_type import CaseLabel from common import common_func as cf -from common import common_type as ct -from scale import constants, scale_common -from scale.helm_env import HelmEnv -from pymilvus import connections, utility +from customize.milvus_operator import MilvusOperator +from scale import constants +from pymilvus import connections +from utils.util_log import test_log as log +from utils.util_k8s import wait_pods_ready prefix = "data_scale" default_schema = cf.gen_default_collection_schema() @@ -17,89 +21,76 @@ default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": class TestDataNodeScale: @pytest.mark.tags(CaseLabel.L3) - def test_expand_data_node(self): + def test_scale_data_node(self): """ - target: test create and insert api after expand dataNode pod - method: 1.create collection a and insert df - 2.expand dataNode pod from 1 to 2 - 3.verify collection a property and verify create and insert of new collection - expected: two collection create and insert op are both correctly + target: + method: + expected: """ release_name = "scale-data" - milvusOp, host, port = scale_common.deploy_default_milvus(release_name) - - - # connect - connections.add_connection(default={"host": host, "port": port}) - connections.connect(alias='default') - # create - c_name = cf.gen_unique_str(prefix) - collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) - # # insert - data = cf.gen_default_list_data() - mutation_res, _ = collection_w.insert(data) - assert mutation_res.insert_count == ct.default_nb - # scale dataNode to 2 pods - milvusOp.upgrade(release_name, {'spec.components.dataNode.replicas': 2}, constants.NAMESPACE) - milvusOp.wait_for_healthy(release_name, constants.NAMESPACE) - - # after scale, assert data consistent - assert utility.has_collection(c_name) - assert collection_w.num_entities == ct.default_nb - # assert new operations - new_cname = cf.gen_unique_str(prefix) - new_collection_w = ApiCollectionWrapper() - new_collection_w.init_collection(name=new_cname, schema=cf.gen_default_collection_schema()) - new_mutation_res, _ = new_collection_w.insert(data) - assert new_mutation_res.insert_count == ct.default_nb - assert new_collection_w.num_entities == ct.default_nb - # assert old collection ddl - mutation_res_2, _ = collection_w.insert(data) - assert mutation_res.insert_count == ct.default_nb - assert collection_w.num_entities == ct.default_nb*2 - - collection_w.drop() - new_collection_w.drop() - - # milvusOp.uninstall(release_name, namespace=constants.NAMESPACE) - - @pytest.mark.tags(CaseLabel.L3) - def test_shrink_data_node(self): - """ - target: test shrink dataNode from 2 to 1 - method: 1.create collection and insert df 2. shrink dataNode 3.insert df - expected: verify the property of collection which channel on shrink pod - """ - release_name = "scale-data" - env = HelmEnv(release_name=release_name, dataNode=2) - host = env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT) + image = f'{constants.IMAGE_REPOSITORY}:{constants.IMAGE_TAG}' + data_config = { + 'metadata.namespace': constants.NAMESPACE, + 'metadata.name': release_name, + 'spec.components.image': image, + 'spec.components.proxy.serviceType': 'LoadBalancer', + 'spec.components.dataNode.replicas': 2, + 'spec.config.dataCoord.enableCompaction': True, + 'spec.config.dataCoord.enableGarbageCollection': True + } + mic = MilvusOperator() + mic.install(data_config) + healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) + log.info(f"milvus healthy: {healthy}") + host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] + # host = '10.98.0.4' # connect connections.add_connection(default={"host": host, "port": 19530}) connections.connect(alias='default') - c_name = "data_scale_one" - data = cf.gen_default_list_data(ct.default_nb) + # create + c_name = cf.gen_unique_str("scale_query") + # c_name = 'scale_query_DymS7kI4' collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) - mutation_res, _ = collection_w.insert(data) - assert mutation_res.insert_count == ct.default_nb - assert collection_w.num_entities == ct.default_nb + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5) - c_name_2 = "data_scale_two" - collection_w2 = ApiCollectionWrapper() - collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema()) - mutation_res2, _ = collection_w2.insert(data) - assert mutation_res2.insert_count == ct.default_nb - assert collection_w2.num_entities == ct.default_nb + tmp_nb = 10000 - env.helm_upgrade_cluster_milvus(dataNode=1) + def do_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(tmp_nb) + collection_w.insert(tmp_df) + log.debug(collection_w.num_entities) - assert collection_w.num_entities == ct.default_nb - mutation_res2, _ = collection_w2.insert(data) - assert collection_w2.num_entities == ct.default_nb*2 - collection_w.drop() - collection_w2.drop() + t_insert = threading.Thread(target=do_insert, args=(), daemon=True) + t_insert.start() - # env.helm_uninstall_cluster_milvus() + # scale dataNode to 5 + mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE) + time.sleep(300) + log.debug("Expand dataNode test finished") + + # create new collection and insert + new_c_name = cf.gen_unique_str("scale_query") + collection_w_new = ApiCollectionWrapper() + collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2) + + def do_new_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(tmp_nb) + collection_w_new.insert(tmp_df) + log.debug(collection_w_new.num_entities) + + t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True) + t_insert_new.start() + + # scale dataNode to 3 + mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + + log.debug(collection_w.num_entities) + time.sleep(300) + log.debug("Shrink dataNode test finished") + + # mic.uninstall(release_name, namespace=constants.NAMESPACE)