From 3336b91ce6a174b1ecefd2480bc1c6e349089791 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Fri, 31 May 2024 13:55:52 +0800 Subject: [PATCH] test: add channel exclusive balance test and resource group test (#33093) Signed-off-by: zhuwenxing --- tests/python_client/base/client_base.py | 10 +- tests/python_client/chaos/checker.py | 4 +- tests/python_client/common/milvus_sys.py | 17 + .../customize/milvus_operator.py | 58 ++ .../customize/template/default.yaml | 109 +- tests/python_client/deploy/milvus_crd.yaml | 13 +- tests/python_client/pytest.ini | 4 +- tests/python_client/requirements.txt | 1 + .../python_client/resource_group/conftest.py | 11 + .../test_channel_exclusive_balance.py | 446 +++++++++ .../resource_group/test_resource_group.py | 944 ++++++++++++++++++ tests/python_client/utils/util_birdwatcher.py | 79 ++ tests/python_client/utils/util_k8s.py | 2 + 13 files changed, 1682 insertions(+), 16 deletions(-) create mode 100644 tests/python_client/resource_group/conftest.py create mode 100644 tests/python_client/resource_group/test_channel_exclusive_balance.py create mode 100644 tests/python_client/resource_group/test_resource_group.py create mode 100644 tests/python_client/utils/util_birdwatcher.py diff --git a/tests/python_client/base/client_base.py b/tests/python_client/base/client_base.py index e5b3cfd2e6..0b52845885 100644 --- a/tests/python_client/base/client_base.py +++ b/tests/python_client/base/client_base.py @@ -1,4 +1,3 @@ -from numpy.core.fromnumeric import _partition_dispatcher import pytest import sys from pymilvus import DefaultConfig @@ -33,7 +32,7 @@ class Base: collection_object_list = [] resource_group_list = [] high_level_api_wrap = None - + skip_connection = False def setup_class(self): log.info("[setup_class] Start setup class...") @@ -128,6 +127,9 @@ class TestcaseBase(Base): def _connect(self, enable_milvus_client_api=False): """ Add a connection and create the connect """ + if self.skip_connection: + return None + if enable_milvus_client_api: if cf.param_info.param_uri: uri = cf.param_info.param_uri @@ -252,8 +254,8 @@ class TestcaseBase(Base): insert_ids = [] time_stamp = 0 # 1 create collection - default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim, primary_field=primary_field, - enable_dynamic_field=enable_dynamic_field, + default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim, primary_field=primary_field, + enable_dynamic_field=enable_dynamic_field, with_json=with_json, multiple_dim_array=multiple_dim_array, is_partition_key=is_partition_key, vector_data_type=vector_data_type) diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 5eb2569777..66cd25475d 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -1331,10 +1331,10 @@ class QueryChecker(Checker): class DeleteChecker(Checker): """check delete operations in a dependent thread""" - def __init__(self, collection_name=None, schema=None): + def __init__(self, collection_name=None, schema=None, shards_num=2): if collection_name is None: collection_name = cf.gen_unique_str("DeleteChecker_") - super().__init__(collection_name=collection_name, schema=schema) + super().__init__(collection_name=collection_name, schema=schema, shards_num=shards_num) res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, timeout=timeout, diff --git a/tests/python_client/common/milvus_sys.py b/tests/python_client/common/milvus_sys.py index f8f2e3e472..7db540bb72 100644 --- a/tests/python_client/common/milvus_sys.py +++ b/tests/python_client/common/milvus_sys.py @@ -3,6 +3,7 @@ import json from pymilvus.grpc_gen import milvus_pb2 as milvus_types from pymilvus import connections from utils.util_log import test_log as log +from utils.util_log import test_log as log sys_info_req = ujson.dumps({"metric_type": "system_info"}) sys_statistics_req = ujson.dumps({"metric_type": "system_statistics"}) sys_logs_req = ujson.dumps({"metric_type": "system_logs"}) @@ -17,9 +18,24 @@ class MilvusSys: # TODO: for now it only supports non_orm style API for getMetricsRequest req = milvus_types.GetMetricsRequest(request=sys_info_req) + self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) + # req = milvus_types.GetMetricsRequest(request=sys_statistics_req) + # self.sys_statistics = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) + # req = milvus_types.GetMetricsRequest(request=sys_logs_req) + # self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=60) log.debug(f"sys_info: {self.sys_info}") + def refresh(self): + req = milvus_types.GetMetricsRequest(request=sys_info_req) + self.sys_info = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) + # req = milvus_types.GetMetricsRequest(request=sys_statistics_req) + # self.sys_statistics = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) + # req = milvus_types.GetMetricsRequest(request=sys_logs_req) + # self.sys_logs = self.handler._stub.GetMetrics(req, wait_for_ready=True, timeout=None) + log.debug(f"sys info response: {self.sys_info.response}") + + @property def build_version(self): """get the first node's build version as milvus build version""" @@ -84,6 +100,7 @@ class MilvusSys: @property def nodes(self): """get all the nodes in Milvus deployment""" + self.refresh() all_nodes = json.loads(self.sys_info.response).get('nodes_info') online_nodes = [node for node in all_nodes if node["infos"]["has_error"] is False] return online_nodes diff --git a/tests/python_client/customize/milvus_operator.py b/tests/python_client/customize/milvus_operator.py index 1140ff08f0..658cbc4334 100644 --- a/tests/python_client/customize/milvus_operator.py +++ b/tests/python_client/customize/milvus_operator.py @@ -3,6 +3,7 @@ import os import time from benedict import benedict from utils.util_log import test_log as log +from utils.util_k8s import get_pod_ip_name_pairs from common.cus_resource_opts import CustomResourceOperations as CusResource template_yaml = os.path.join(os.path.dirname(__file__), 'template/default.yaml') @@ -81,11 +82,13 @@ class MilvusOperator(object): if delete_depends: del_configs = {'spec.dependencies.etcd.inCluster.deletionPolicy': 'Delete', 'spec.dependencies.pulsar.inCluster.deletionPolicy': 'Delete', + 'spec.dependencies.kafka.inCluster.deletionPolicy': 'Delete', 'spec.dependencies.storage.inCluster.deletionPolicy': 'Delete' } if delete_pvc: del_configs.update({'spec.dependencies.etcd.inCluster.pvcDeletion': True, 'spec.dependencies.pulsar.inCluster.pvcDeletion': True, + 'spec.dependencies.kafka.inCluster.pvcDeletion': True, 'spec.dependencies.storage.inCluster.pvcDeletion': True }) if delete_depends or delete_pvc: @@ -113,6 +116,40 @@ class MilvusOperator(object): version=self.version, namespace=namespace) log.debug(f"upgrade milvus with configs: {d_configs}") cus_res.patch(release_name, d_configs) + self.wait_for_healthy(release_name, namespace=namespace) + + def rolling_update(self, release_name, new_image_name, namespace='default'): + """ + Method: patch custom resource object to rolling update milvus + Params: + release_name: release name of milvus + namespace: namespace that the milvus is running in + """ + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + rolling_configs = {'spec.components.enableRollingUpdate': True, + 'spec.components.imageUpdateMode': "rollingUpgrade", + 'spec.components.image': new_image_name} + log.debug(f"rolling update milvus with configs: {rolling_configs}") + cus_res.patch(release_name, rolling_configs) + self.wait_for_healthy(release_name, namespace=namespace) + + def scale(self, release_name, component, replicas, namespace='default'): + """ + Method: scale milvus components by replicas + Params: + release_name: release name of milvus + replicas: the number of replicas to scale + component: the component to scale, e.g: dataNode, queryNode, indexNode, proxy + namespace: namespace that the milvus is running in + """ + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + component = component.replace('node', 'Node') + scale_configs = {f'spec.components.{component}.replicas': replicas} + log.info(f"scale milvus with configs: {scale_configs}") + self.upgrade(release_name, scale_configs, namespace=namespace) + self.wait_for_healthy(release_name, namespace=namespace) def wait_for_healthy(self, release_name, namespace='default', timeout=600): """ @@ -152,3 +189,24 @@ class MilvusOperator(object): endpoint = res_object['status']['endpoint'] return endpoint + + def etcd_endpoints(self, release_name, namespace='default'): + """ + Method: get etcd endpoints by name and namespace + Return: a string type etcd endpoints. e.g: host:port + """ + etcd_endpoints = None + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + res_object = cus_res.get(release_name) + try: + etcd_endpoints = res_object['spec']['dependencies']['etcd']['endpoints'] + except KeyError: + log.info("etcd endpoints not found") + # get pod ip by pod name + label_selector = f"app.kubernetes.io/instance={release_name}-etcd, app.kubernetes.io/name=etcd" + res = get_pod_ip_name_pairs(namespace, label_selector) + if res: + etcd_endpoints = [f"{pod_ip}:2379" for pod_ip in res.keys()] + return etcd_endpoints[0] + diff --git a/tests/python_client/customize/template/default.yaml b/tests/python_client/customize/template/default.yaml index 507fe56193..d3f71a8bbe 100644 --- a/tests/python_client/customize/template/default.yaml +++ b/tests/python_client/customize/template/default.yaml @@ -13,6 +13,7 @@ spec: simdType: avx components: {} dependencies: + msgStreamType: kafka etcd: inCluster: deletionPolicy: Delete @@ -21,6 +22,113 @@ spec: metrics: podMonitor: enabled: true + kafka: + inCluster: + deletionPolicy: Retain + pvcDeletion: false + values: + replicaCount: 3 + defaultReplicationFactor: 2 + metrics: + kafka: + enabled: true + serviceMonitor: + enabled: true + jmx: + enabled: true + pulsar: + inCluster: + deletionPolicy: Retain + pvcDeletion: false + values: + components: + autorecovery: false + functions: false + toolset: false + pulsar_manager: false + monitoring: + prometheus: false + grafana: false + node_exporter: false + alert_manager: false + proxy: + replicaCount: 1 + resources: + requests: + cpu: 0.01 + memory: 256Mi + configData: + PULSAR_MEM: > + -Xms256m -Xmx256m + PULSAR_GC: > + -XX:MaxDirectMemorySize=256m + bookkeeper: + replicaCount: 2 + resources: + requests: + cpu: 0.01 + memory: 256Mi + configData: + PULSAR_MEM: > + -Xms256m + -Xmx256m + -XX:MaxDirectMemorySize=256m + PULSAR_GC: > + -Dio.netty.leakDetectionLevel=disabled + -Dio.netty.recycler.linkCapacity=1024 + -XX:+UseG1GC -XX:MaxGCPauseMillis=10 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis + -XX:ParallelGCThreads=32 + -XX:ConcGCThreads=32 + -XX:G1NewSizePercent=50 + -XX:+DisableExplicitGC + -XX:-ResizePLAB + -XX:+ExitOnOutOfMemoryError + -XX:+PerfDisableSharedMem + -XX:+PrintGCDetails + zookeeper: + replicaCount: 1 + resources: + requests: + cpu: 0.01 + memory: 256Mi + configData: + PULSAR_MEM: > + -Xms256m + -Xmx256m + PULSAR_GC: > + -Dcom.sun.management.jmxremote + -Djute.maxbuffer=10485760 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis -XX:+DisableExplicitGC + -XX:+PerfDisableSharedMem + -Dzookeeper.forceSync=no + broker: + replicaCount: 1 + resources: + requests: + cpu: 0.01 + memory: 256Mi + configData: + PULSAR_MEM: > + -Xms256m + -Xmx256m + PULSAR_GC: > + -XX:MaxDirectMemorySize=256m + -Dio.netty.leakDetectionLevel=disabled + -Dio.netty.recycler.linkCapacity=1024 + -XX:+ParallelRefProcEnabled + -XX:+UnlockExperimentalVMOptions + -XX:+DoEscapeAnalysis + -XX:ParallelGCThreads=32 + -XX:ConcGCThreads=32 + -XX:G1NewSizePercent=50 + -XX:+DisableExplicitGC + -XX:-ResizePLAB + -XX:+ExitOnOutOfMemoryError storage: inCluster: deletionPolicy: Delete @@ -29,4 +137,3 @@ spec: metrics: podMonitor: enabled: true - \ No newline at end of file diff --git a/tests/python_client/deploy/milvus_crd.yaml b/tests/python_client/deploy/milvus_crd.yaml index 41cab33511..d078b76463 100644 --- a/tests/python_client/deploy/milvus_crd.yaml +++ b/tests/python_client/deploy/milvus_crd.yaml @@ -7,11 +7,11 @@ metadata: labels: app: milvus spec: - mode: standalone + mode: cluster config: dataNode: memory: - forceSyncEnable: false + forceSyncEnable: false rootCoord: enableActiveStandby: true dataCoord: @@ -29,7 +29,7 @@ spec: components: enableRollingUpdate: true imageUpdateMode: rollingUpgrade - image: milvusdb/milvus:2.2.0-20230208-2e4d64ec + image: harbor.milvus.io/milvus/milvus:master-20240426-4fb8044a-amd64 disableMetric: false dataNode: replicas: 3 @@ -45,7 +45,7 @@ spec: pvcDeletion: false values: replicaCount: 3 - kafka: + kafka: inCluster: deletionPolicy: Retain pvcDeletion: false @@ -58,13 +58,13 @@ spec: serviceMonitor: enabled: true jmx: - enabled: true + enabled: true pulsar: inCluster: deletionPolicy: Retain pvcDeletion: false values: - components: + components: autorecovery: false functions: false toolset: false @@ -158,4 +158,3 @@ spec: pvcDeletion: false values: mode: distributed - \ No newline at end of file diff --git a/tests/python_client/pytest.ini b/tests/python_client/pytest.ini index 122b5e8bf6..1c90a7f2fd 100644 --- a/tests/python_client/pytest.ini +++ b/tests/python_client/pytest.ini @@ -1,7 +1,7 @@ [pytest] -addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v +addopts = --host 10.104.21.154 --minio_host 10.104.21.153 --html=/tmp/ci_logs/report.html --self-contained-html -v --log-cli-level=INFO --capture=no # python3 -W ignore -m pytest log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s) @@ -9,4 +9,4 @@ log_date_format = %Y-%m-%d %H:%M:%S filterwarnings = - ignore::DeprecationWarning \ No newline at end of file + ignore::DeprecationWarning diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 99ad4f62c9..6b62783758 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -46,6 +46,7 @@ loguru==0.7.0 psutil==5.9.4 pandas==1.5.3 tenacity==8.1.0 +rich==13.7.0 # for standby test etcd-sdk-python==0.0.4 deepdiff==6.7.1 diff --git a/tests/python_client/resource_group/conftest.py b/tests/python_client/resource_group/conftest.py new file mode 100644 index 0000000000..7e56a38456 --- /dev/null +++ b/tests/python_client/resource_group/conftest.py @@ -0,0 +1,11 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption("--image_tag", action="store", default="master-20240514-89a7c34c", help="image_tag") + + +@pytest.fixture +def image_tag(request): + return request.config.getoption("--image_tag") + diff --git a/tests/python_client/resource_group/test_channel_exclusive_balance.py b/tests/python_client/resource_group/test_channel_exclusive_balance.py new file mode 100644 index 0000000000..f916014fde --- /dev/null +++ b/tests/python_client/resource_group/test_channel_exclusive_balance.py @@ -0,0 +1,446 @@ +import pytest +import time +from pymilvus import connections, utility, Collection +from utils.util_log import test_log as log +from base.client_base import TestcaseBase +from chaos.checker import (InsertChecker, + FlushChecker, + UpsertChecker, + DeleteChecker, + Op, + ResultAnalyzer + ) +from chaos import chaos_commons as cc +from common import common_func as cf +from utils.util_k8s import get_querynode_id_pod_pairs +from utils.util_birdwatcher import BirdWatcher +from customize.milvus_operator import MilvusOperator +from common.milvus_sys import MilvusSys +from common.common_type import CaseLabel +from chaos.chaos_commons import assert_statistic + +namespace = 'chaos-testing' +prefix = "test_rg" + +from rich.table import Table +from rich.console import Console + + +def display_segment_distribution_info(collection_name, release_name, segment_info=None): + table = Table(title=f"{collection_name} Segment Distribution Info") + table.width = 200 + table.add_column("Segment ID", style="cyan") + table.add_column("Collection ID", style="cyan") + table.add_column("Partition ID", style="cyan") + table.add_column("Num Rows", style="cyan") + table.add_column("State", style="cyan") + table.add_column("Channel", style="cyan") + table.add_column("Node ID", style="cyan") + table.add_column("Node Name", style="cyan") + res = utility.get_query_segment_info(collection_name) + log.info(f"segment info: {res}") + label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode" + querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label) + for r in res: + channel = "unknown" + if segment_info and str(r.segmentID) in segment_info: + channel = segment_info[str(r.segmentID)]["Insert Channel"] + table.add_row( + str(r.segmentID), + str(r.collectionID), + str(r.partitionID), + str(r.num_rows), + str(r.state), + str(channel), + str(r.nodeIds), + str([querynode_id_pod_pair.get(node_id) for node_id in r.nodeIds]) + ) + console = Console() + console.width = 300 + console.print(table) + + +def display_channel_on_qn_distribution_info(collection_name, release_name, segment_info=None): + """ + node id, node name, channel, segment id + 1, rg-test-613938-querynode-0, [rg-test-613938-rootcoord-dml_3_449617770820133536v0], [449617770820133655] + 2, rg-test-613938-querynode-1, [rg-test-613938-rootcoord-dml_3_449617770820133537v0], [449617770820133656] + + """ + m = {} + res = utility.get_query_segment_info(collection_name) + for r in res: + if r.nodeIds: + for node_id in r.nodeIds: + if node_id not in m: + m[node_id] = { + "node_name": "", + "channel": [], + "segment_id": [] + } + m[node_id]["segment_id"].append(r.segmentID) + # get channel info + for node_id in m.keys(): + for seg in m[node_id]["segment_id"]: + if segment_info and str(seg) in segment_info: + m[node_id]["channel"].append(segment_info[str(seg)]["Insert Channel"]) + + # get node name + label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode" + querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label) + for node_id in m.keys(): + m[node_id]["node_name"] = querynode_id_pod_pair.get(node_id) + + table = Table(title=f"{collection_name} Channel Distribution Info") + table.width = 200 + table.add_column("Node ID", style="cyan") + table.add_column("Node Name", style="cyan") + table.add_column("Channel", style="cyan") + table.add_column("Segment ID", style="cyan") + for node_id, v in m.items(): + table.add_row( + str(node_id), + str(v["node_name"]), + "\n".join([str(x) for x in set(v["channel"])]), + "\n".join([str(x) for x in v["segment_id"]]) + ) + console = Console() + console.width = 300 + console.print(table) + return m + + +def _install_milvus(image_tag="master-latest"): + release_name = f"rg-test-{cf.gen_digits_by_length(6)}" + cus_configs = {'spec.mode': 'cluster', + 'spec.dependencies.msgStreamType': 'kafka', + 'spec.components.image': f'harbor.milvus.io/milvus/milvus:{image_tag}', + 'metadata.namespace': namespace, + 'metadata.name': release_name, + 'spec.components.proxy.serviceType': 'LoadBalancer', + 'spec.config.queryCoord.balancer': 'ChannelLevelScoreBalancer', + 'spec.config.queryCoord.channelExclusiveNodeFactor': 2 + } + milvus_op = MilvusOperator() + log.info(f"install milvus with configs: {cus_configs}") + milvus_op.install(cus_configs) + healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200) + log.info(f"milvus healthy: {healthy}") + if healthy: + endpoint = milvus_op.endpoint(release_name, namespace).split(':') + log.info(f"milvus endpoint: {endpoint}") + host = endpoint[0] + port = endpoint[1] + return release_name, host, port + else: + return release_name, None, None + + +class TestChannelExclusiveBalance(TestcaseBase): + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + milvus_op = MilvusOperator() + milvus_op.uninstall(self.release_name, namespace) + connections.disconnect("default") + connections.remove_connection("default") + + def init_health_checkers(self, collection_name=None, shards_num=2): + c_name = collection_name + checkers = { + Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num), + Op.flush: FlushChecker(collection_name=c_name, shards_num=shards_num), + Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num), + Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num), + } + self.health_checkers = checkers + + @pytest.mark.tags(CaseLabel.L3) + def test_channel_exclusive_balance_during_qn_scale_up(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + qn_num = 1 + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace) + bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + c = Collection(name=c_name) + res = c.describe() + collection_id = res["collection_id"] + cc.start_monitor_threads(self.health_checkers) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + qn_num += min(qn_num + 1, 8) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + milvus_op.scale(release_name, 'queryNode', 8, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + time.sleep(60) + # in final state, channel exclusive balance is on, so all qn should have only one channel + for k, v in res.items(): + assert len(set(v["channel"])) == 1 + + + @pytest.mark.tags(CaseLabel.L3) + def test_channel_exclusive_balance_during_qn_scale_down(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + qn_num = 8 + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace) + bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + c = Collection(name=c_name) + res = c.describe() + collection_id = res["collection_id"] + cc.start_monitor_threads(self.health_checkers) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + qn_num = max(qn_num - 1, 3) + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + milvus_op.scale(release_name, 'queryNode', 1, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + time.sleep(60) + # shard num = 2, k = 2, qn_num = 3 + # in final state, channel exclusive balance is off, so all qn should have more than one channel + for k, v in res.items(): + assert len(set(v["channel"])) > 1 + + @pytest.mark.tags(CaseLabel.L3) + def test_channel_exclusive_balance_with_channel_num_is_1(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + qn_num = 1 + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace) + bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name, shards_num=1) + c = Collection(name=c_name) + res = c.describe() + collection_id = res["collection_id"] + cc.start_monitor_threads(self.health_checkers) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + qn_num = qn_num + 1 + qn_num = min(qn_num, 8) + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + for r in res: + assert len(set(r["channel"])) == 1 + milvus_op.scale(release_name, 'queryNode', 8, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + time.sleep(60) + + # since shard num is 1, so all qn should have only one channel, no matter what k is + for k, v in res.items(): + assert len(set(v["channel"])) == 1 + + @pytest.mark.tags(CaseLabel.L3) + def test_channel_exclusive_balance_after_k_increase(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + qn_num = 1 + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace) + bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + c = Collection(name=c_name) + res = c.describe() + collection_id = res["collection_id"] + cc.start_monitor_threads(self.health_checkers) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + qn_num = qn_num + 1 + qn_num = min(qn_num, 8) + if qn_num == 5: + config = { + "spec.config.queryCoord.channelExclusiveNodeFactor": 3 + } + milvus_op.upgrade(release_name, config, namespace) + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + if qn_num == 4: + # channel exclusive balance is on, so all qn should have only one channel + for r in res.values(): + assert len(set(r["channel"])) == 1 + if qn_num == 5: + # k is changed to 3 when qn_num is 5, + # channel exclusive balance is off, so all qn should have more than one channel + # wait for a while to make sure all qn have more than one channel + ready = False + t0 = time.time() + while not ready and time.time() - t0 < 180: + ready = True + for r in res.values(): + if len(set(r["channel"])) == 1: + ready = False + time.sleep(10) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + if qn_num == 6: + # channel exclusive balance is on, so all qn should have only one channel + ready = False + t0 = time.time() + while not ready and time.time() - t0 < 180: + ready = True + for r in res.values(): + if len(set(r["channel"])) != 1: + ready = False + time.sleep(10) + res = display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + milvus_op.scale(release_name, 'queryNode', 8, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + time.sleep(60) + + @pytest.mark.tags(CaseLabel.L3) + def test_channel_exclusive_balance_for_search_performance(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + qn_num = 1 + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + etcd_endpoint = milvus_op.etcd_endpoints(release_name, namespace) + bw = BirdWatcher(etcd_endpoints=etcd_endpoint, root_path=release_name) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + c = Collection(name=c_name) + res = c.describe() + collection_id = res["collection_id"] + cc.start_monitor_threads(self.health_checkers) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + qn_num = qn_num + 1 + qn_num = min(qn_num, 8) + milvus_op.scale(release_name, 'queryNode', qn_num, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + milvus_op.scale(release_name, 'queryNode', 8, namespace) + seg_res = bw.show_segment_info(collection_id) + display_segment_distribution_info(c_name, release_name, segment_info=seg_res) + display_channel_on_qn_distribution_info(c_name, release_name, segment_info=seg_res) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + time.sleep(60) diff --git a/tests/python_client/resource_group/test_resource_group.py b/tests/python_client/resource_group/test_resource_group.py new file mode 100644 index 0000000000..0e4e448bd2 --- /dev/null +++ b/tests/python_client/resource_group/test_resource_group.py @@ -0,0 +1,944 @@ +import pytest +import time +from typing import Union, List +from pymilvus import connections, utility, Collection +from pymilvus.client.constants import DEFAULT_RESOURCE_GROUP +from pymilvus.client.types import ResourceGroupConfig, ResourceGroupInfo +from utils.util_log import test_log as log +from base.client_base import TestcaseBase +from chaos.checker import (InsertChecker, + UpsertChecker, + SearchChecker, + HybridSearchChecker, + QueryChecker, + DeleteChecker, + Op, + ResultAnalyzer + ) +from chaos import chaos_commons as cc +from common import common_func as cf +from utils.util_k8s import get_querynode_id_pod_pairs +from common import common_type as ct +from customize.milvus_operator import MilvusOperator +from common.milvus_sys import MilvusSys +from common.common_type import CaseLabel +from chaos.chaos_commons import assert_statistic +from delayed_assert import assert_expectations + +namespace = 'chaos-testing' +prefix = "test_rg" + +from rich.table import Table +from rich.console import Console + + +def display_resource_group_info(info: Union[ResourceGroupInfo, List[ResourceGroupInfo]]): + table = Table(title="Resource Group Info") + table.width = 200 + table.add_column("Name", style="cyan") + table.add_column("Capacity", style="cyan") + table.add_column("Available Node", style="cyan") + table.add_column("Loaded Replica", style="cyan") + table.add_column("Outgoing Node", style="cyan") + table.add_column("Incoming Node", style="cyan") + table.add_column("Request", style="cyan") + table.add_column("Limit", style="cyan") + table.add_column("Nodes", style="cyan") + if isinstance(info, list): + for i in info: + table.add_row( + i.name, + str(i.capacity), + str(i.num_available_node), + str(i.num_loaded_replica), + str(i.num_outgoing_node), + str(i.num_incoming_node), + str(i.config.requests.node_num), + str(i.config.limits.node_num), + "\n".join([str(node.hostname) for node in i.nodes]) + ) + else: + table.add_row( + info.name, + str(info.capacity), + str(info.num_available_node), + str(info.num_loaded_replica), + str(info.num_outgoing_node), + str(info.num_incoming_node), + str(info.config.requests.node_num), + str(info.config.limits.node_num), + "\n".join([str(node.hostname) for node in info.nodes]) + ) + + console = Console() + console.width = 300 + console.print(table) + + +def display_segment_distribution_info(collection_name, release_name): + table = Table(title=f"{collection_name} Segment Distribution Info") + table.width = 200 + table.add_column("Segment ID", style="cyan") + table.add_column("Collection ID", style="cyan") + table.add_column("Partition ID", style="cyan") + table.add_column("Num Rows", style="cyan") + table.add_column("State", style="cyan") + table.add_column("Node ID", style="cyan") + table.add_column("Node Name", style="cyan") + res = utility.get_query_segment_info(collection_name) + label = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/component=querynode" + querynode_id_pod_pair = get_querynode_id_pod_pairs("chaos-testing", label) + + for r in res: + table.add_row( + str(r.segmentID), + str(r.collectionID), + str(r.partitionID), + str(r.num_rows), + str(r.state), + str(r.nodeIds), + str([querynode_id_pod_pair.get(node_id) for node_id in r.nodeIds]) + ) + console = Console() + console.width = 300 + console.print(table) + + +def list_all_resource_groups(): + rg_names = utility.list_resource_groups() + resource_groups = [] + for rg_name in rg_names: + resource_group = utility.describe_resource_group(rg_name) + resource_groups.append(resource_group) + display_resource_group_info(resource_groups) + + +def _install_milvus(image_tag="master-latest"): + release_name = f"rg-test-{cf.gen_digits_by_length(6)}" + cus_configs = {'spec.mode': 'cluster', + 'spec.dependencies.msgStreamType': 'kafka', + 'spec.components.image': f'harbor.milvus.io/milvus/milvus:{image_tag}', + 'metadata.namespace': namespace, + 'metadata.name': release_name, + 'spec.components.proxy.serviceType': 'LoadBalancer', + } + milvus_op = MilvusOperator() + log.info(f"install milvus with configs: {cus_configs}") + milvus_op.install(cus_configs) + healthy = milvus_op.wait_for_healthy(release_name, namespace, timeout=1200) + log.info(f"milvus healthy: {healthy}") + if healthy: + endpoint = milvus_op.endpoint(release_name, namespace).split(':') + log.info(f"milvus endpoint: {endpoint}") + host = endpoint[0] + port = endpoint[1] + return release_name, host, port + else: + return release_name, None, None + + +class TestResourceGroup(TestcaseBase): + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + milvus_op = MilvusOperator() + milvus_op.uninstall(self.release_name, namespace) + connections.disconnect("default") + connections.remove_connection("default") + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_scale_up(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + # scale up rg1 to 8 nodes one by one + for replicas in range(1, 8): + milvus_op.scale(release_name, 'queryNode', replicas, namespace) + time.sleep(10) + # get querynode info + qn = mil.query_nodes + log.info(f"query node info: {len(qn)}") + resource_group = self.utility.describe_resource_group(name) + log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}") + list_all_resource_groups() + # assert the node in rg >= 4 + resource_group = self.utility.describe_resource_group(name) + assert resource_group.num_available_node >= 4 + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_scale_down(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 8, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + # scale down rg1 from 8 to 1 node one by one + for replicas in range(8, 1, -1): + milvus_op.scale(release_name, 'queryNode', replicas, namespace) + time.sleep(10) + resource_group = self.utility.describe_resource_group(name) + log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}") + list_all_resource_groups() + # assert the node in rg <= 1 + resource_group = self.utility.describe_resource_group(name) + assert resource_group.num_available_node <= 1 + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_all_querynode_add_into_two_different_config_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 8, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + rg_list = [] + # create rg1 with request node_num=4, limit node_num=6 + + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + rg_list.append(name) + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + rg_list.append(name) + # assert two rg satisfy the request node_num + list_all_resource_groups() + for rg in rg_list: + resource_group = self.utility.describe_resource_group(rg) + assert resource_group.num_available_node >= resource_group.config.requests.node_num + + # scale down rg1 from 8 to 1 node one by one + for replicas in range(8, 1, -1): + milvus_op.scale(release_name, 'queryNode', replicas, namespace) + time.sleep(10) + for name in rg_list: + resource_group = self.utility.describe_resource_group(name) + log.info(f"Resource group {name} info:\n {display_resource_group_info(resource_group)}") + list_all_resource_groups() + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_querynode_add_into_two_different_config_rg_one_by_one(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + rg_list = [] + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + rg_list.append(name) + + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + rg_list.append(name) + for replicas in range(1, 8): + milvus_op.scale(release_name, 'queryNode', replicas, namespace) + time.sleep(10) + list_all_resource_groups() + + for rg in rg_list: + resource_group = self.utility.describe_resource_group(rg) + assert resource_group.num_available_node >= resource_group.config.requests.node_num + # scale down rg1 from 8 to 1 node one by one + for replicas in range(8, 1, -1): + milvus_op.scale(release_name, 'queryNode', replicas, namespace) + time.sleep(10) + list_all_resource_groups() + for rg in rg_list: + resource_group = self.utility.describe_resource_group(rg) + assert resource_group.num_available_node >= 1 + + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_querynode_add_into_new_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + + self.release_name = release_name + milvus_op.scale(release_name, 'queryNode', 10, namespace) + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + rg_list = [] + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + rg_list.append(name) + for rg in rg_list: + resource_group = self.utility.describe_resource_group(rg) + assert resource_group.num_available_node >= resource_group.config.requests.node_num + + # create a new rg with request node_num=3, limit node_num=6 + # the querynode will be added into the new rg from default rg + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + rg_list.append(name) + list_all_resource_groups() + for rg in rg_list: + resource_group = self.utility.describe_resource_group(rg) + assert resource_group.num_available_node >= resource_group.config.requests.node_num + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_with_two_rg_link_to_each_other_when_all_not_reached_to_request(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + milvus_op.scale(release_name, 'queryNode', 8, namespace) + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 1})}) + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + rg1_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + name = cf.gen_unique_str("rg") + rg2_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 6}, + )) + list_all_resource_groups() + log.info("update resource group") + utility.update_resource_groups( + {rg1_name: ResourceGroupConfig(requests={"node_num": 6}, + limits={"node_num": 8}, + transfer_from=[{"resource_group": rg2_name}], + transfer_to=[{"resource_group": rg2_name}], )}) + time.sleep(10) + list_all_resource_groups() + utility.update_resource_groups( + {rg2_name: ResourceGroupConfig(requests={"node_num": 6}, + limits={"node_num": 8}, + transfer_from=[{"resource_group": rg1_name}], + transfer_to=[{"resource_group": rg1_name}], )}) + time.sleep(10) + list_all_resource_groups() + # no querynode was transferred between rg1 and rg2 + resource_group = self.utility.describe_resource_group(rg1_name) + assert resource_group.num_available_node == 4 + resource_group = self.utility.describe_resource_group(rg2_name) + assert resource_group.num_available_node == 4 + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_with_rg_transfer_from_non_default_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + milvus_op.scale(release_name, 'queryNode', 15, namespace) + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 3})}) + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + rg1_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 2}, + limits={"node_num": 2}, + )) + name = cf.gen_unique_str("rg") + rg2_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 6}, + limits={"node_num": 10}, + )) + list_all_resource_groups() + rg2_available_node_before = self.utility.describe_resource_group(rg2_name).num_available_node + log.info("update resource group") + utility.update_resource_groups( + {rg1_name: ResourceGroupConfig(requests={"node_num": 4}, + limits={"node_num": 6}, + transfer_from=[{"resource_group": rg2_name}], + transfer_to=[{"resource_group": rg2_name}], )}) + time.sleep(10) + list_all_resource_groups() + # expect qn in rg 1 transfer from rg2 not the default rg + rg2_available_node_after = self.utility.describe_resource_group(rg2_name).num_available_node + assert rg2_available_node_before > rg2_available_node_after + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_with_rg_transfer_to_non_default_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + milvus_op.scale(release_name, 'queryNode', 10, namespace) + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})}) + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + rg1_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 2}, + limits={"node_num": 10}, + )) + name = cf.gen_unique_str("rg") + rg2_name = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 4}, + limits={"node_num": 4}, + )) + list_all_resource_groups() + rg1_node_available_before = self.utility.describe_resource_group(rg1_name).num_available_node + log.info("update resource group") + utility.update_resource_groups( + {rg2_name: ResourceGroupConfig(requests={"node_num": 2}, + limits={"node_num": 2}, + transfer_from=[{"resource_group": rg1_name}], + transfer_to=[{"resource_group": rg1_name}], )}) + time.sleep(10) + list_all_resource_groups() + # expect qn in rg 2 transfer to rg1 not the default rg + rg1_node_available_after = self.utility.describe_resource_group(rg1_name).num_available_node + assert rg1_node_available_after > rg1_node_available_before + + + @pytest.mark.tags(CaseLabel.L3) + def test_resource_group_with_rg_transfer_with_rg_list(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + milvus_op.scale(release_name, 'queryNode', 12, namespace) + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 1})}) + # create rg1 with request node_num=4, limit node_num=6 + name = cf.gen_unique_str("rg") + source_rg = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 1}, + limits={"node_num": 1}, + )) + name = cf.gen_unique_str("rg") + small_rg = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 2}, + limits={"node_num": 4}, + )) + name = cf.gen_unique_str("rg") + big_rg = name + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + list_all_resource_groups() + small_rg_node_available_before = self.utility.describe_resource_group(small_rg).num_available_node + big_rg_node_available_before = self.utility.describe_resource_group(big_rg).num_available_node + log.info("update resource group") + utility.update_resource_groups( + {source_rg: ResourceGroupConfig(requests={"node_num": 6}, + limits={"node_num": 6}, + transfer_from=[{"resource_group": small_rg}, {"resource_group": big_rg}], + )}) + time.sleep(10) + list_all_resource_groups() + # expect source rg transfer from small rg and big rg + small_rg_node_available_after = self.utility.describe_resource_group(small_rg).num_available_node + big_rg_node_available_after = self.utility.describe_resource_group(big_rg).num_available_node + assert (small_rg_node_available_before + big_rg_node_available_before > small_rg_node_available_after + + big_rg_node_available_after) + + +class TestReplicasManagement(TestcaseBase): + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + milvus_op = MilvusOperator() + milvus_op.uninstall(self.release_name, namespace) + connections.disconnect("default") + connections.remove_connection("default") + + @pytest.mark.tags(CaseLabel.L3) + def test_load_replicas_one_collection_multi_replicas_to_multi_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 12, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + resource_groups = [] + for i in range(4): + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 2}, + limits={"node_num": 6}, + )) + resource_groups.append(name) + list_all_resource_groups() + + # create collection and load with 2 replicase + self.skip_connection = True + collection_w, vectors = self.init_collection_general(prefix, insert_data=True, + enable_dynamic_field=True)[0:2] + collection_w.release() + log.info(f"resource groups: {resource_groups}") + collection_w.load(replica_number=len(resource_groups), _resource_groups=resource_groups) + list_all_resource_groups() + + # list replicas + replicas = collection_w.get_replicas() + log.info(f"replicas: {replicas}") + rg_to_scale_down = resource_groups[0] + # scale down a rg to 1 node + self.utility.update_resource_groups( + {rg_to_scale_down: ResourceGroupConfig(requests={"node_num": 1}, + limits={"node_num": 1}, )} + ) + + list_all_resource_groups() + replicas = collection_w.get_replicas() + log.info(f"replicas: {replicas}") + # scale down a rg t0 0 node + self.utility.update_resource_groups( + {rg_to_scale_down: ResourceGroupConfig(requests={"node_num": 0}, + limits={"node_num": 0}, )} + ) + list_all_resource_groups() + replicas = collection_w.get_replicas() + log.info(f"replicas: {replicas}") + + @pytest.mark.tags(CaseLabel.L3) + def test_load_multi_collection_multi_replicas_to_multi_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 12, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + # create two rg with request node_num=4, limit node_num=6 + resource_groups = [] + for i in range(3): + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + resource_groups.append(name) + log.info(f"resource groups: {resource_groups}") + list_all_resource_groups() + col_list = [] + # create collection and load with multi replicase + self.skip_connection = True + for i in range(3): + prefix = cf.gen_unique_str("test_rg") + collection_w, vectors = self.init_collection_general(prefix, insert_data=True, + enable_dynamic_field=True)[0:2] + collection_w.release() + col_list.append(collection_w) + collection_w.load(replica_number=len(resource_groups), _resource_groups=resource_groups) + list_all_resource_groups() + + # list replicas + for col in col_list: + replicas = col.get_replicas() + log.info(f"replicas: {replicas}") + + @pytest.mark.tags(CaseLabel.L3) + def test_load_multi_collection_one_replicas_to_multi_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 12, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + # create two rg with request node_num=4, limit node_num=6 + resource_groups = [] + for i in range(3): + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + resource_groups.append(name) + log.info(f"resource groups: {resource_groups}") + list_all_resource_groups() + col_list = [] + # create collection and load with multi replicase + self.skip_connection = True + for i in range(3): + prefix = cf.gen_unique_str("test_rg") + collection_w, vectors = self.init_collection_general(prefix, insert_data=True, + enable_dynamic_field=True)[0:2] + collection_w.release() + col_list.append(collection_w) + collection_w.load(replica_number=1, _resource_groups=resource_groups) + list_all_resource_groups() + + # list replicas + for col in col_list: + replicas = col.get_replicas() + log.info(f"replicas: {replicas}") + + @pytest.mark.tags(CaseLabel.L3) + def test_transfer_replicas_to_other_rg(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 12, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + # create two rg with request node_num=4, limit node_num=6 + resource_groups = [] + for i in range(3): + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 3}, + limits={"node_num": 6}, + )) + resource_groups.append(name) + log.info(f"resource groups: {resource_groups}") + list_all_resource_groups() + col_list = [] + # create collection and load with multi replicase + self.skip_connection = True + for i in range(3): + prefix = cf.gen_unique_str("test_rg") + collection_w, vectors = self.init_collection_general(prefix, insert_data=True, + enable_dynamic_field=True)[0:2] + collection_w.release() + col_list.append(collection_w) + collection_w.load(replica_number=1, _resource_groups=[resource_groups[i]]) + list_all_resource_groups() + # list replicas + for col in col_list: + replicas = col.get_replicas() + log.info(f"replicas: {replicas}") + + # transfer replicas to default rg + self.utility.transfer_replica(source_group=resource_groups[0], target_group=DEFAULT_RESOURCE_GROUP, + collection_name=col_list[0].name, num_replicas=1) + + list_all_resource_groups() + # list replicas + for col in col_list: + replicas = col.get_replicas() + log.info(f"replicas: {replicas}") + + +class TestServiceAvailableDuringScale(TestcaseBase): + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + shards_num = 5 + checkers = { + Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num), + Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num), + Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num), + Op.hybrid_search: HybridSearchChecker(collection_name=c_name, shards_num=shards_num), + Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num), + Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num), + } + self.health_checkers = checkers + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + milvus_op = MilvusOperator() + milvus_op.uninstall(self.release_name, namespace) + connections.disconnect("default") + connections.remove_connection("default") + + def test_service_available_during_scale_up(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 3, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})}) + # create rg + resource_groups = [] + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 1}, + limits={"node_num": 1}, + )) + resource_groups.append(name) + list_all_resource_groups() + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + # load collection to non default rg + self.health_checkers[Op.search].c_wrap.release() + self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups) + cc.start_monitor_threads(self.health_checkers) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration//10) + for k, v in self.health_checkers.items(): + v.check_result() + # scale up querynode when progress is 3/10 + if i == 3: + utility.update_resource_groups( + {name: ResourceGroupConfig(requests={"node_num": 2}, limits={"node_num": 2})}) + log.info(f"scale up querynode in rg {name} from 1 to 2") + list_all_resource_groups() + display_segment_distribution_info(c_name, release_name) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + + def test_service_available_during_scale_down(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 3, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 5})}) + # create rg + resource_groups = [] + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 2}, + limits={"node_num": 2}, + )) + resource_groups.append(name) + list_all_resource_groups() + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + # load collection to non default rg + self.health_checkers[Op.search].c_wrap.release() + self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups) + cc.start_monitor_threads(self.health_checkers) + list_all_resource_groups() + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration//10) + for k, v in self.health_checkers.items(): + v.check_result() + # scale down querynode in rg when progress is 3/10 + if i == 3: + list_all_resource_groups() + utility.update_resource_groups( + {name: ResourceGroupConfig(requests={"node_num": 1}, limits={"node_num": 1})}) + log.info(f"scale down querynode in rg {name} from 2 to 1") + list_all_resource_groups() + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() + + +class TestServiceAvailableDuringTransferReplicas(TestcaseBase): + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + shards_num = 5 + checkers = { + Op.insert: InsertChecker(collection_name=c_name, shards_num=shards_num), + Op.upsert: UpsertChecker(collection_name=c_name, shards_num=shards_num), + Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num), + Op.hybrid_search: HybridSearchChecker(collection_name=c_name, shards_num=shards_num), + Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num), + Op.delete: DeleteChecker(collection_name=c_name, shards_num=shards_num), + } + self.health_checkers = checkers + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + milvus_op = MilvusOperator() + milvus_op.uninstall(self.release_name, namespace) + connections.disconnect("default") + connections.remove_connection("default") + + def test_service_available_during_transfer_replicas(self, image_tag): + """ + steps + """ + milvus_op = MilvusOperator() + release_name, host, port = _install_milvus(image_tag=image_tag) + milvus_op.scale(release_name, 'queryNode', 5, namespace) + self.release_name = release_name + assert host is not None + connections.connect("default", host=host, port=port) + mil = MilvusSys(alias="default") + log.info(f"milvus build version: {mil.build_version}") + utility.update_resource_groups( + {DEFAULT_RESOURCE_GROUP: ResourceGroupConfig(requests={"node_num": 0}, limits={"node_num": 10})}) + # create rg + resource_groups = [] + for i in range(2): + name = cf.gen_unique_str("rg") + self.utility = utility + self.utility.create_resource_group(name, config=ResourceGroupConfig( + requests={"node_num": 1}, + limits={"node_num": 1}, + )) + resource_groups.append(name) + list_all_resource_groups() + c_name = cf.gen_unique_str("Checker_") + self.init_health_checkers(collection_name=c_name) + self.health_checkers[Op.search].c_wrap.release() + self.health_checkers[Op.search].c_wrap.load(_resource_groups=resource_groups[0:1]) + cc.start_monitor_threads(self.health_checkers) + list_all_resource_groups() + display_segment_distribution_info(c_name, release_name) + log.info("*********************Load Start**********************") + request_duration = 360 + for i in range(10): + time.sleep(request_duration//10) + for k, v in self.health_checkers.items(): + v.check_result() + # transfer replicas from default to another + if i == 3: + # transfer replicas from default rg to another rg + list_all_resource_groups() + display_segment_distribution_info(c_name, release_name) + self.utility.transfer_replica(source_group=resource_groups[0], target_group=resource_groups[1], + collection_name=c_name, num_replicas=1) + list_all_resource_groups() + display_segment_distribution_info(c_name, release_name) + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() + assert_statistic(self.health_checkers) + for k, v in self.health_checkers.items(): + v.terminate() diff --git a/tests/python_client/utils/util_birdwatcher.py b/tests/python_client/utils/util_birdwatcher.py new file mode 100644 index 0000000000..b7c4abe405 --- /dev/null +++ b/tests/python_client/utils/util_birdwatcher.py @@ -0,0 +1,79 @@ +import os +import re +from utils.util_log import test_log as log + + +def extraction_all_data(text): + # Patterns to handle the specifics of each key-value line + patterns = { + 'Segment ID': r"Segment ID:\s*(\d+)", + 'Segment State': r"Segment State:\s*(\w+)", + 'Collection ID': r"Collection ID:\s*(\d+)", + 'PartitionID': r"PartitionID:\s*(\d+)", + 'Insert Channel': r"Insert Channel:(.+)", + 'Num of Rows': r"Num of Rows:\s*(\d+)", + 'Max Row Num': r"Max Row Num:\s*(\d+)", + 'Last Expire Time': r"Last Expire Time:\s*(.+)", + 'Compact from': r"Compact from:\s*(\[\])", + 'Start Position ID': r"Start Position ID:\s*(\[[\d\s]+\])", + 'Start Position Time': r"Start Position ID:.*time:\s*(.+),", + 'Start Channel Name': r"channel name:\s*([^,\n]+)", + 'Dml Position ID': r"Dml Position ID:\s*(\[[\d\s]+\])", + 'Dml Position Time': r"Dml Position ID:.*time:\s*(.+),", + 'Dml Channel Name': r"channel name:\s*(.+)", + 'Binlog Nums': r"Binlog Nums:\s*(\d+)", + 'StatsLog Nums': r"StatsLog Nums:\s*(\d+)", + 'DeltaLog Nums': r"DeltaLog Nums:\s*(\d+)" + } + + refined_data = {} + for key, pattern in patterns.items(): + match = re.search(pattern, text) + if match: + refined_data[key] = match.group(1).strip() + + return refined_data + + +class BirdWatcher: + """ + + birdwatcher is a cli tool to get information about milvus + the command: + show segment info + """ + + def __init__(self, etcd_endpoints, root_path): + self.prefix = f"birdwatcher --olc=\"#connect --etcd {etcd_endpoints} --rootPath={root_path}," + + def parse_segment_info(self, output): + splitter = output.strip().split('\n')[0] + segments = output.strip().split(splitter) + segments = [segment for segment in segments if segment.strip()] + + # Parse all segments + parsed_segments = [extraction_all_data(segment) for segment in segments] + parsed_segments = [segment for segment in parsed_segments if segment] + return parsed_segments + + def show_segment_info(self, collection_id=None): + cmd = f"{self.prefix} show segment info --format table\"" + if collection_id: + cmd = f"{self.prefix} show segment info --collection {collection_id} --format table\"" + log.info(f"cmd: {cmd}") + output = os.popen(cmd).read() + # log.info(f"{cmd} output: {output}") + output = self.parse_segment_info(output) + for segment in output: + log.info(segment) + seg_res = {} + for segment in output: + seg_res[segment['Segment ID']] = segment + return seg_res + + +if __name__ == "__main__": + birdwatcher = BirdWatcher("10.104.18.24:2379", "rg-test-613938") + res = birdwatcher.show_segment_info() + print(res) + diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index ffaba8bcc1..b514e3444c 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -452,6 +452,8 @@ def record_time_when_standby_activated(namespace, release_name, coord_type, time log.info(f"Standby {coord_type} pod does not switch standby mode") + + if __name__ == '__main__': label = "app.kubernetes.io/name=milvus, component=querynode" instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")