mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
[skip ci] Update scale data node test (#12879)
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
parent
d8b2e84135
commit
55e62cc7ff
@ -1,6 +1,6 @@
|
|||||||
# scale object
|
# scale object
|
||||||
IMAGE_REPOSITORY = "milvusdb/milvus-dev" # repository of milvus image
|
IMAGE_REPOSITORY = "harbor.zilliz.cc/milvus/milvus" # repository of milvus image
|
||||||
IMAGE_TAG = "master-latest" # tag of milvus image
|
IMAGE_TAG = "master-20211207-4cd314d" # tag of milvus image
|
||||||
NAMESPACE = "chaos-testing" # namespace
|
NAMESPACE = "chaos-testing" # namespace
|
||||||
IF_NOT_PRESENT = "IfNotPresent" # image pullPolicy IfNotPresent
|
IF_NOT_PRESENT = "IfNotPresent" # image pullPolicy IfNotPresent
|
||||||
ALWAYS = "Always" # image pullPolicy Always
|
ALWAYS = "Always" # image pullPolicy Always
|
||||||
|
|||||||
@ -1,12 +1,16 @@
|
|||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from base.collection_wrapper import ApiCollectionWrapper
|
from base.collection_wrapper import ApiCollectionWrapper
|
||||||
from common.common_type import CaseLabel
|
from common.common_type import CaseLabel
|
||||||
from common import common_func as cf
|
from common import common_func as cf
|
||||||
from common import common_type as ct
|
from customize.milvus_operator import MilvusOperator
|
||||||
from scale import constants, scale_common
|
from scale import constants
|
||||||
from scale.helm_env import HelmEnv
|
from pymilvus import connections
|
||||||
from pymilvus import connections, utility
|
from utils.util_log import test_log as log
|
||||||
|
from utils.util_k8s import wait_pods_ready
|
||||||
|
|
||||||
prefix = "data_scale"
|
prefix = "data_scale"
|
||||||
default_schema = cf.gen_default_collection_schema()
|
default_schema = cf.gen_default_collection_schema()
|
||||||
@ -17,89 +21,76 @@ default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params":
|
|||||||
class TestDataNodeScale:
|
class TestDataNodeScale:
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
def test_expand_data_node(self):
|
def test_scale_data_node(self):
|
||||||
"""
|
"""
|
||||||
target: test create and insert api after expand dataNode pod
|
target:
|
||||||
method: 1.create collection a and insert df
|
method:
|
||||||
2.expand dataNode pod from 1 to 2
|
expected:
|
||||||
3.verify collection a property and verify create and insert of new collection
|
|
||||||
expected: two collection create and insert op are both correctly
|
|
||||||
"""
|
"""
|
||||||
release_name = "scale-data"
|
release_name = "scale-data"
|
||||||
milvusOp, host, port = scale_common.deploy_default_milvus(release_name)
|
image = f'{constants.IMAGE_REPOSITORY}:{constants.IMAGE_TAG}'
|
||||||
|
data_config = {
|
||||||
|
'metadata.namespace': constants.NAMESPACE,
|
||||||
# connect
|
'metadata.name': release_name,
|
||||||
connections.add_connection(default={"host": host, "port": port})
|
'spec.components.image': image,
|
||||||
connections.connect(alias='default')
|
'spec.components.proxy.serviceType': 'LoadBalancer',
|
||||||
# create
|
'spec.components.dataNode.replicas': 2,
|
||||||
c_name = cf.gen_unique_str(prefix)
|
'spec.config.dataCoord.enableCompaction': True,
|
||||||
collection_w = ApiCollectionWrapper()
|
'spec.config.dataCoord.enableGarbageCollection': True
|
||||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
}
|
||||||
# # insert
|
mic = MilvusOperator()
|
||||||
data = cf.gen_default_list_data()
|
mic.install(data_config)
|
||||||
mutation_res, _ = collection_w.insert(data)
|
healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200)
|
||||||
assert mutation_res.insert_count == ct.default_nb
|
log.info(f"milvus healthy: {healthy}")
|
||||||
# scale dataNode to 2 pods
|
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||||
milvusOp.upgrade(release_name, {'spec.components.dataNode.replicas': 2}, constants.NAMESPACE)
|
# host = '10.98.0.4'
|
||||||
milvusOp.wait_for_healthy(release_name, constants.NAMESPACE)
|
|
||||||
|
|
||||||
# after scale, assert data consistent
|
|
||||||
assert utility.has_collection(c_name)
|
|
||||||
assert collection_w.num_entities == ct.default_nb
|
|
||||||
# assert new operations
|
|
||||||
new_cname = cf.gen_unique_str(prefix)
|
|
||||||
new_collection_w = ApiCollectionWrapper()
|
|
||||||
new_collection_w.init_collection(name=new_cname, schema=cf.gen_default_collection_schema())
|
|
||||||
new_mutation_res, _ = new_collection_w.insert(data)
|
|
||||||
assert new_mutation_res.insert_count == ct.default_nb
|
|
||||||
assert new_collection_w.num_entities == ct.default_nb
|
|
||||||
# assert old collection ddl
|
|
||||||
mutation_res_2, _ = collection_w.insert(data)
|
|
||||||
assert mutation_res.insert_count == ct.default_nb
|
|
||||||
assert collection_w.num_entities == ct.default_nb*2
|
|
||||||
|
|
||||||
collection_w.drop()
|
|
||||||
new_collection_w.drop()
|
|
||||||
|
|
||||||
# milvusOp.uninstall(release_name, namespace=constants.NAMESPACE)
|
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
|
||||||
def test_shrink_data_node(self):
|
|
||||||
"""
|
|
||||||
target: test shrink dataNode from 2 to 1
|
|
||||||
method: 1.create collection and insert df 2. shrink dataNode 3.insert df
|
|
||||||
expected: verify the property of collection which channel on shrink pod
|
|
||||||
"""
|
|
||||||
release_name = "scale-data"
|
|
||||||
env = HelmEnv(release_name=release_name, dataNode=2)
|
|
||||||
host = env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT)
|
|
||||||
|
|
||||||
# connect
|
# connect
|
||||||
connections.add_connection(default={"host": host, "port": 19530})
|
connections.add_connection(default={"host": host, "port": 19530})
|
||||||
connections.connect(alias='default')
|
connections.connect(alias='default')
|
||||||
|
|
||||||
c_name = "data_scale_one"
|
# create
|
||||||
data = cf.gen_default_list_data(ct.default_nb)
|
c_name = cf.gen_unique_str("scale_query")
|
||||||
|
# c_name = 'scale_query_DymS7kI4'
|
||||||
collection_w = ApiCollectionWrapper()
|
collection_w = ApiCollectionWrapper()
|
||||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5)
|
||||||
mutation_res, _ = collection_w.insert(data)
|
|
||||||
assert mutation_res.insert_count == ct.default_nb
|
|
||||||
assert collection_w.num_entities == ct.default_nb
|
|
||||||
|
|
||||||
c_name_2 = "data_scale_two"
|
tmp_nb = 10000
|
||||||
collection_w2 = ApiCollectionWrapper()
|
|
||||||
collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema())
|
|
||||||
mutation_res2, _ = collection_w2.insert(data)
|
|
||||||
assert mutation_res2.insert_count == ct.default_nb
|
|
||||||
assert collection_w2.num_entities == ct.default_nb
|
|
||||||
|
|
||||||
env.helm_upgrade_cluster_milvus(dataNode=1)
|
def do_insert():
|
||||||
|
while True:
|
||||||
|
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
|
||||||
|
collection_w.insert(tmp_df)
|
||||||
|
log.debug(collection_w.num_entities)
|
||||||
|
|
||||||
assert collection_w.num_entities == ct.default_nb
|
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
|
||||||
mutation_res2, _ = collection_w2.insert(data)
|
t_insert.start()
|
||||||
assert collection_w2.num_entities == ct.default_nb*2
|
|
||||||
collection_w.drop()
|
|
||||||
collection_w2.drop()
|
|
||||||
|
|
||||||
# env.helm_uninstall_cluster_milvus()
|
# scale dataNode to 5
|
||||||
|
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE)
|
||||||
|
time.sleep(300)
|
||||||
|
log.debug("Expand dataNode test finished")
|
||||||
|
|
||||||
|
# create new collection and insert
|
||||||
|
new_c_name = cf.gen_unique_str("scale_query")
|
||||||
|
collection_w_new = ApiCollectionWrapper()
|
||||||
|
collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
|
||||||
|
|
||||||
|
def do_new_insert():
|
||||||
|
while True:
|
||||||
|
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
|
||||||
|
collection_w_new.insert(tmp_df)
|
||||||
|
log.debug(collection_w_new.num_entities)
|
||||||
|
|
||||||
|
t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True)
|
||||||
|
t_insert_new.start()
|
||||||
|
|
||||||
|
# scale dataNode to 3
|
||||||
|
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE)
|
||||||
|
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||||
|
|
||||||
|
log.debug(collection_w.num_entities)
|
||||||
|
time.sleep(300)
|
||||||
|
log.debug("Shrink dataNode test finished")
|
||||||
|
|
||||||
|
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user