mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add part scale test cases (#6504)
* add deploy local with helm Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * add dataNode scale test Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * add queryNode scale test Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * [skip ci] akip ci Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
parent
1c29b42434
commit
5ba96dc96f
12
tests20/python_client/scale/constants.py
Normal file
12
tests20/python_client/scale/constants.py
Normal file
@ -0,0 +1,12 @@
|
||||
# scale object
|
||||
IMAGE_REPOSITORY = "milvusdb/milvus-dev"
|
||||
IMAGE_TAG = "master-latest"
|
||||
IF_NOT_PRESENT = "IfNotPresent"
|
||||
ALWAYS = "Always"
|
||||
PROXY = "proxy"
|
||||
DATA_NODE = "dataNode"
|
||||
INDEX_NODE = "indexNode"
|
||||
QUERY_NODE = "queryNode"
|
||||
|
||||
# my values.yaml path
|
||||
HELM_VALUES_PATH = '/home/zong/milvus-helm/charts/milvus'
|
||||
90
tests20/python_client/scale/helm_env.py
Normal file
90
tests20/python_client/scale/helm_env.py
Normal file
@ -0,0 +1,90 @@
|
||||
import os
|
||||
|
||||
from scale import constants
|
||||
from utils.util_log import test_log as log
|
||||
from common import common_func as cf
|
||||
|
||||
|
||||
class HelmEnv:
|
||||
def __init__(self, release_name=None, **kwargs):
|
||||
self.release_name = release_name if release_name else cf.gen_unique_str("scale")
|
||||
self.proxy = kwargs.get(constants.PROXY, 1)
|
||||
self.data_node = kwargs.get(constants.DATA_NODE, 1)
|
||||
self.index_node = kwargs.get(constants.INDEX_NODE, 1)
|
||||
self.query_node = kwargs.get(constants.QUERY_NODE, 1)
|
||||
|
||||
def helm_install_cluster_milvus(self, image_pull_policy=constants.IF_NOT_PRESENT):
|
||||
"""
|
||||
default deploy cluster milvus with only one xxxNode
|
||||
helm install --wait --timeout 180s --set image.all.repository=milvusdb/milvus-dev --set image.all.tag=master-latest
|
||||
--set cluster.enabled=true --set service.type=LoadBalancer
|
||||
--set image.all=Always clu-zong .
|
||||
:param image_pull_policy: image pullPolicy includes: IF_NOT_PRESENT and ALWAYS
|
||||
:param kwargs: PROXY, DATA_NODE, INDEX_NODE, QUERY_NODE
|
||||
:return: svc ip
|
||||
"""
|
||||
install_cmd = f'helm install --wait --timeout 360s ' \
|
||||
f'--set image.all.repository={constants.IMAGE_REPOSITORY} ' \
|
||||
f'--set image.all.tag={constants.IMAGE_TAG} ' \
|
||||
f'--set cluster.enabled=true ' \
|
||||
f'--set service.type=LoadBalancer ' \
|
||||
f'--set image.all.pullPolicy={image_pull_policy} ' \
|
||||
f'--set proxy.replicas={self.proxy} ' \
|
||||
f'--set dataNode.replicas={self.data_node} ' \
|
||||
f'--set indexNode.replicas={self.index_node} ' \
|
||||
f'--set queryNode.replicas={self.query_node} ' \
|
||||
f'{self.release_name} . '
|
||||
log.info(install_cmd)
|
||||
os.system(f'cd {constants.HELM_VALUES_PATH} && {install_cmd}')
|
||||
# raise Exception("Failed to deploy cluster milvus")
|
||||
# todo
|
||||
# return svc ip
|
||||
|
||||
def helm_upgrade_cluster_milvus(self, **kwargs):
|
||||
"""
|
||||
scale milvus pod num by helm upgrade
|
||||
when upgrading pod nums, other --set need to be the same as helm install
|
||||
:param kwargs: PROXY, DATA_NODE, INDEX_NODE, QUERY_NODE
|
||||
:return: None
|
||||
"""
|
||||
proxy = kwargs.get(constants.PROXY, self.proxy)
|
||||
data_node = kwargs.get(constants.DATA_NODE, self.data_node)
|
||||
index_node = kwargs.get(constants.INDEX_NODE, self.index_node)
|
||||
query_node = kwargs.get(constants.QUERY_NODE, self.query_node)
|
||||
upgrade_cmd = f'helm upgrade --install ' \
|
||||
f'--set image.all.repository={constants.IMAGE_REPOSITORY} ' \
|
||||
f'--set image.all.tag={constants.IMAGE_TAG} ' \
|
||||
f'--set cluster.enabled=true ' \
|
||||
f'--set service.type=LoadBalancer ' \
|
||||
f'--set proxy.replicas={proxy} ' \
|
||||
f'--set dataNode.replicas={data_node} ' \
|
||||
f'--set indexNode.replicas={index_node} ' \
|
||||
f'--set queryNode.replicas={query_node} ' \
|
||||
f'{self.release_name} . '
|
||||
log.info(upgrade_cmd)
|
||||
if os.system(f'cd {constants.HELM_VALUES_PATH} && {upgrade_cmd}'):
|
||||
raise Exception(f'Failed to upgrade cluster milvus with {kwargs}')
|
||||
|
||||
def helm_uninstall_cluster_milvus(self):
|
||||
"""
|
||||
helm uninstall and delete etcd pvc
|
||||
:return: None
|
||||
"""
|
||||
uninstall_cmd = f'helm uninstall {self.release_name}'
|
||||
if os.system(uninstall_cmd):
|
||||
raise Exception(f'Failed to uninstall {self.release_name}')
|
||||
# delete etcd pvc
|
||||
delete_pvc_cmd = f'kubectl delete pvc data-{self.release_name}-etcd-0'
|
||||
if os.system(delete_pvc_cmd):
|
||||
raise Exception(f'Failed to delete {self.release_name} etcd pvc')
|
||||
# delete plusar
|
||||
# delete_pvc_plusar_cmd = "kubectl delete pvc scale-test-milvus-pulsar"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# default deploy q replicas
|
||||
release_name = "scale-test"
|
||||
env = HelmEnv(release_name=release_name)
|
||||
# env.helm_install_cluster_milvus()
|
||||
# env.helm_upgrade_cluster_milvus(queryNode=2)
|
||||
env.helm_uninstall_cluster_milvus()
|
||||
105
tests20/python_client/scale/test_data_node_scale.py
Normal file
105
tests20/python_client/scale/test_data_node_scale.py
Normal file
@ -0,0 +1,105 @@
|
||||
import random
|
||||
|
||||
import pytest
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from utils.util_log import test_log as log
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from scale import constants
|
||||
from scale.helm_env import HelmEnv
|
||||
from pymilvus_orm import connections, utility
|
||||
|
||||
prefix = "data_scale"
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
|
||||
|
||||
class TestDataNodeScale:
|
||||
|
||||
def test_expand_data_node(self):
|
||||
"""
|
||||
target: test create and insert api after expand dataNode pod
|
||||
method: 1.create collection a and insert df
|
||||
2.expand dataNode pod from 1 to 2
|
||||
3.verify collection a property and verify create and insert of new collection
|
||||
expected: two collection create and insert op are both correctly
|
||||
"""
|
||||
# deploy all nodes one pod cluster milvus with helm
|
||||
release_name = "scale-test"
|
||||
env = HelmEnv(release_name=release_name)
|
||||
env.helm_install_cluster_milvus()
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
# create
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
# # insert
|
||||
data = cf.gen_default_list_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data)
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
# scale dataNode to 2 pods
|
||||
env.helm_upgrade_cluster_milvus(dataNode=2)
|
||||
# after scale, assert data consistent
|
||||
assert utility.has_collection(c_name)
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
# assert new operations
|
||||
new_cname = cf.gen_unique_str(prefix)
|
||||
new_collection_w = ApiCollectionWrapper()
|
||||
new_collection_w.init_collection(name=new_cname, schema=cf.gen_default_collection_schema())
|
||||
new_mutation_res, _ = new_collection_w.insert(data)
|
||||
assert new_mutation_res.insert_count == ct.default_nb
|
||||
assert new_collection_w.num_entities == ct.default_nb
|
||||
# assert old collection ddl
|
||||
mutation_res_2, _ = collection_w.insert(data)
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert collection_w.num_entities == ct.default_nb*2
|
||||
|
||||
collection_w.drop()
|
||||
new_collection_w.drop()
|
||||
# env.helm_uninstall_cluster_milvus()
|
||||
|
||||
def test_shrink_data_node(self):
|
||||
"""
|
||||
target: test shrink dataNode from 2 to 1
|
||||
method: 1.create collection and insert df 2. shrink dataNode 3.insert df
|
||||
expected: verify the property of collection which channel on shrink pod
|
||||
"""
|
||||
release_name = "scale-test"
|
||||
env = HelmEnv(release_name=release_name, dataNode=2)
|
||||
env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT)
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
|
||||
c_name = "data_scale_one"
|
||||
data = cf.gen_default_list_data(ct.default_nb)
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
mutation_res, _ = collection_w.insert(data)
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
c_name_2 = "data_scale_two"
|
||||
collection_w2 = ApiCollectionWrapper()
|
||||
collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema())
|
||||
mutation_res2, _ = collection_w2.insert(data)
|
||||
assert mutation_res2.insert_count == ct.default_nb
|
||||
assert collection_w2.num_entities == ct.default_nb
|
||||
|
||||
env.helm_upgrade_cluster_milvus(dataNode=1)
|
||||
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
mutation_res2, _ = collection_w2.insert(data)
|
||||
assert collection_w2.num_entities == ct.default_nb*2
|
||||
collection_w.drop()
|
||||
collection_w2.drop()
|
||||
|
||||
# env.helm_uninstall_cluster_milvus()
|
||||
|
||||
|
||||
57
tests20/python_client/scale/test_query_node_scale.py
Normal file
57
tests20/python_client/scale/test_query_node_scale.py
Normal file
@ -0,0 +1,57 @@
|
||||
import random
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from scale.helm_env import HelmEnv
|
||||
from utils.util_log import test_log as log
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from pymilvus_orm import Index, connections
|
||||
|
||||
prefix = "search_scale"
|
||||
nb = 5000
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
|
||||
|
||||
class TestSearchScale:
|
||||
def test_search_scale(self):
|
||||
release_name = "scale-test"
|
||||
env = HelmEnv(release_name=release_name)
|
||||
env.helm_install_cluster_milvus()
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
|
||||
# create
|
||||
c_name = "data_scale_one"
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
# insert
|
||||
data = cf.gen_default_list_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data)
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
# # create index
|
||||
# collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
# assert collection_w.has_index()
|
||||
# assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
|
||||
# default_index_params)
|
||||
collection_w.load()
|
||||
# vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(5)]
|
||||
res1, _ = collection_w.search(data[-1][:5], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
|
||||
# scale queryNode pod
|
||||
env.helm_upgrade_cluster_milvus(queryNode=2)
|
||||
|
||||
c_name_2 = "data_scale_two"
|
||||
collection_w2 = ApiCollectionWrapper()
|
||||
collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema())
|
||||
collection_w2.insert(data)
|
||||
assert collection_w2.num_entities == ct.default_nb
|
||||
collection_w2.load()
|
||||
res2, _ = collection_w2.search(data[-1][:5], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
|
||||
assert res1[0].ids == res2[0].ids
|
||||
Loading…
x
Reference in New Issue
Block a user