diff --git a/tests20/python_client/scale/constants.py b/tests20/python_client/scale/constants.py new file mode 100644 index 0000000000..c70b3eedad --- /dev/null +++ b/tests20/python_client/scale/constants.py @@ -0,0 +1,12 @@ +# scale object +IMAGE_REPOSITORY = "milvusdb/milvus-dev" +IMAGE_TAG = "master-latest" +IF_NOT_PRESENT = "IfNotPresent" +ALWAYS = "Always" +PROXY = "proxy" +DATA_NODE = "dataNode" +INDEX_NODE = "indexNode" +QUERY_NODE = "queryNode" + +# my values.yaml path +HELM_VALUES_PATH = '/home/zong/milvus-helm/charts/milvus' diff --git a/tests20/python_client/scale/helm_env.py b/tests20/python_client/scale/helm_env.py new file mode 100644 index 0000000000..25dd8b5157 --- /dev/null +++ b/tests20/python_client/scale/helm_env.py @@ -0,0 +1,90 @@ +import os + +from scale import constants +from utils.util_log import test_log as log +from common import common_func as cf + + +class HelmEnv: + def __init__(self, release_name=None, **kwargs): + self.release_name = release_name if release_name else cf.gen_unique_str("scale") + self.proxy = kwargs.get(constants.PROXY, 1) + self.data_node = kwargs.get(constants.DATA_NODE, 1) + self.index_node = kwargs.get(constants.INDEX_NODE, 1) + self.query_node = kwargs.get(constants.QUERY_NODE, 1) + + def helm_install_cluster_milvus(self, image_pull_policy=constants.IF_NOT_PRESENT): + """ + default deploy cluster milvus with only one xxxNode + helm install --wait --timeout 180s --set image.all.repository=milvusdb/milvus-dev --set image.all.tag=master-latest + --set cluster.enabled=true --set service.type=LoadBalancer + --set image.all=Always clu-zong . + :param image_pull_policy: image pullPolicy includes: IF_NOT_PRESENT and ALWAYS + :param kwargs: PROXY, DATA_NODE, INDEX_NODE, QUERY_NODE + :return: svc ip + """ + install_cmd = f'helm install --wait --timeout 360s ' \ + f'--set image.all.repository={constants.IMAGE_REPOSITORY} ' \ + f'--set image.all.tag={constants.IMAGE_TAG} ' \ + f'--set cluster.enabled=true ' \ + f'--set service.type=LoadBalancer ' \ + f'--set image.all.pullPolicy={image_pull_policy} ' \ + f'--set proxy.replicas={self.proxy} ' \ + f'--set dataNode.replicas={self.data_node} ' \ + f'--set indexNode.replicas={self.index_node} ' \ + f'--set queryNode.replicas={self.query_node} ' \ + f'{self.release_name} . ' + log.info(install_cmd) + os.system(f'cd {constants.HELM_VALUES_PATH} && {install_cmd}') + # raise Exception("Failed to deploy cluster milvus") + # todo + # return svc ip + + def helm_upgrade_cluster_milvus(self, **kwargs): + """ + scale milvus pod num by helm upgrade + when upgrading pod nums, other --set need to be the same as helm install + :param kwargs: PROXY, DATA_NODE, INDEX_NODE, QUERY_NODE + :return: None + """ + proxy = kwargs.get(constants.PROXY, self.proxy) + data_node = kwargs.get(constants.DATA_NODE, self.data_node) + index_node = kwargs.get(constants.INDEX_NODE, self.index_node) + query_node = kwargs.get(constants.QUERY_NODE, self.query_node) + upgrade_cmd = f'helm upgrade --install ' \ + f'--set image.all.repository={constants.IMAGE_REPOSITORY} ' \ + f'--set image.all.tag={constants.IMAGE_TAG} ' \ + f'--set cluster.enabled=true ' \ + f'--set service.type=LoadBalancer ' \ + f'--set proxy.replicas={proxy} ' \ + f'--set dataNode.replicas={data_node} ' \ + f'--set indexNode.replicas={index_node} ' \ + f'--set queryNode.replicas={query_node} ' \ + f'{self.release_name} . ' + log.info(upgrade_cmd) + if os.system(f'cd {constants.HELM_VALUES_PATH} && {upgrade_cmd}'): + raise Exception(f'Failed to upgrade cluster milvus with {kwargs}') + + def helm_uninstall_cluster_milvus(self): + """ + helm uninstall and delete etcd pvc + :return: None + """ + uninstall_cmd = f'helm uninstall {self.release_name}' + if os.system(uninstall_cmd): + raise Exception(f'Failed to uninstall {self.release_name}') + # delete etcd pvc + delete_pvc_cmd = f'kubectl delete pvc data-{self.release_name}-etcd-0' + if os.system(delete_pvc_cmd): + raise Exception(f'Failed to delete {self.release_name} etcd pvc') + # delete plusar + # delete_pvc_plusar_cmd = "kubectl delete pvc scale-test-milvus-pulsar" + + +if __name__ == '__main__': + # default deploy q replicas + release_name = "scale-test" + env = HelmEnv(release_name=release_name) + # env.helm_install_cluster_milvus() + # env.helm_upgrade_cluster_milvus(queryNode=2) + env.helm_uninstall_cluster_milvus() diff --git a/tests20/python_client/scale/test_data_node_scale.py b/tests20/python_client/scale/test_data_node_scale.py new file mode 100644 index 0000000000..8820fa3fcb --- /dev/null +++ b/tests20/python_client/scale/test_data_node_scale.py @@ -0,0 +1,105 @@ +import random + +import pytest + +from base.collection_wrapper import ApiCollectionWrapper +from utils.util_log import test_log as log +from common import common_func as cf +from common import common_type as ct +from scale import constants +from scale.helm_env import HelmEnv +from pymilvus_orm import connections, utility + +prefix = "data_scale" +default_schema = cf.gen_default_collection_schema() +default_search_exp = "int64 >= 0" +default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} + + +class TestDataNodeScale: + + def test_expand_data_node(self): + """ + target: test create and insert api after expand dataNode pod + method: 1.create collection a and insert df + 2.expand dataNode pod from 1 to 2 + 3.verify collection a property and verify create and insert of new collection + expected: two collection create and insert op are both correctly + """ + # deploy all nodes one pod cluster milvus with helm + release_name = "scale-test" + env = HelmEnv(release_name=release_name) + env.helm_install_cluster_milvus() + + # connect + connections.add_connection(default={"host": '10.98.0.8', "port": 19530}) + connections.connect(alias='default') + # create + c_name = cf.gen_unique_str(prefix) + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) + # # insert + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data) + assert mutation_res.insert_count == ct.default_nb + # scale dataNode to 2 pods + env.helm_upgrade_cluster_milvus(dataNode=2) + # after scale, assert data consistent + assert utility.has_collection(c_name) + assert collection_w.num_entities == ct.default_nb + # assert new operations + new_cname = cf.gen_unique_str(prefix) + new_collection_w = ApiCollectionWrapper() + new_collection_w.init_collection(name=new_cname, schema=cf.gen_default_collection_schema()) + new_mutation_res, _ = new_collection_w.insert(data) + assert new_mutation_res.insert_count == ct.default_nb + assert new_collection_w.num_entities == ct.default_nb + # assert old collection ddl + mutation_res_2, _ = collection_w.insert(data) + assert mutation_res.insert_count == ct.default_nb + assert collection_w.num_entities == ct.default_nb*2 + + collection_w.drop() + new_collection_w.drop() + # env.helm_uninstall_cluster_milvus() + + def test_shrink_data_node(self): + """ + target: test shrink dataNode from 2 to 1 + method: 1.create collection and insert df 2. shrink dataNode 3.insert df + expected: verify the property of collection which channel on shrink pod + """ + release_name = "scale-test" + env = HelmEnv(release_name=release_name, dataNode=2) + env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT) + + # connect + connections.add_connection(default={"host": '10.98.0.8', "port": 19530}) + connections.connect(alias='default') + + c_name = "data_scale_one" + data = cf.gen_default_list_data(ct.default_nb) + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) + mutation_res, _ = collection_w.insert(data) + assert mutation_res.insert_count == ct.default_nb + assert collection_w.num_entities == ct.default_nb + + c_name_2 = "data_scale_two" + collection_w2 = ApiCollectionWrapper() + collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema()) + mutation_res2, _ = collection_w2.insert(data) + assert mutation_res2.insert_count == ct.default_nb + assert collection_w2.num_entities == ct.default_nb + + env.helm_upgrade_cluster_milvus(dataNode=1) + + assert collection_w.num_entities == ct.default_nb + mutation_res2, _ = collection_w2.insert(data) + assert collection_w2.num_entities == ct.default_nb*2 + collection_w.drop() + collection_w2.drop() + + # env.helm_uninstall_cluster_milvus() + + diff --git a/tests20/python_client/scale/test_query_node_scale.py b/tests20/python_client/scale/test_query_node_scale.py new file mode 100644 index 0000000000..d5c0f2fce4 --- /dev/null +++ b/tests20/python_client/scale/test_query_node_scale.py @@ -0,0 +1,57 @@ +import random + +from base.collection_wrapper import ApiCollectionWrapper +from scale.helm_env import HelmEnv +from utils.util_log import test_log as log +from common import common_func as cf +from common import common_type as ct +from pymilvus_orm import Index, connections + +prefix = "search_scale" +nb = 5000 +default_schema = cf.gen_default_collection_schema() +default_search_exp = "int64 >= 0" +default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} + + +class TestSearchScale: + def test_search_scale(self): + release_name = "scale-test" + env = HelmEnv(release_name=release_name) + env.helm_install_cluster_milvus() + + # connect + connections.add_connection(default={"host": '10.98.0.8', "port": 19530}) + connections.connect(alias='default') + + # create + c_name = "data_scale_one" + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) + # insert + data = cf.gen_default_list_data(ct.default_nb) + mutation_res, _ = collection_w.insert(data) + assert mutation_res.insert_count == ct.default_nb + # # create index + # collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + # assert collection_w.has_index() + # assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, + # default_index_params) + collection_w.load() + # vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(5)] + res1, _ = collection_w.search(data[-1][:5], ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit) + + # scale queryNode pod + env.helm_upgrade_cluster_milvus(queryNode=2) + + c_name_2 = "data_scale_two" + collection_w2 = ApiCollectionWrapper() + collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema()) + collection_w2.insert(data) + assert collection_w2.num_entities == ct.default_nb + collection_w2.load() + res2, _ = collection_w2.search(data[-1][:5], ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit) + + assert res1[0].ids == res2[0].ids