diff --git a/tests/python_client/cdc/perf/milvus_cdc_perf_test.py b/tests/python_client/cdc/perf/milvus_cdc_perf_test.py index aad9d53dbe..56d66348b0 100644 --- a/tests/python_client/cdc/perf/milvus_cdc_perf_test.py +++ b/tests/python_client/cdc/perf/milvus_cdc_perf_test.py @@ -4,13 +4,12 @@ import threading from concurrent.futures import ThreadPoolExecutor from pymilvus import connections, Collection, DataType, FieldSchema, CollectionSchema from loguru import logger -import socket -import dns.resolver +from kubernetes import client, config from urllib.parse import urlparse import itertools class MilvusCDCPerformanceTest: - def __init__(self, source_uri, source_token, target_uri, target_token): + def __init__(self, source_uri, source_token, target_uri, target_token, source_release_name): self.source_uri = source_uri self.source_token = source_token self.target_uri = target_uri @@ -23,11 +22,15 @@ class MilvusCDCPerformanceTest: self.stop_query = False self.latencies = [] self.latest_insert_status = {"latest_ts": 0, "latest_count": 0} - self.init_collection() - self.source_pod_ips = self.get_pod_ips(source_uri) - self.target_pod_ips = self.get_pod_ips(target_uri) + config.load_kube_config() + + self.k8s_api = client.CoreV1Api() + self.source_release_name = source_release_name + self.source_pod_ips = self.get_pod_ips(self.source_release_name) self.source_ip_cycle = itertools.cycle(self.source_pod_ips) - self.target_ip_cycle = itertools.cycle(self.target_pod_ips) + self.init_collection() + + def init_collection(self): connections.connect(alias="default", uri=self.source_uri, token=self.source_token) fields = [ @@ -46,21 +49,24 @@ class MilvusCDCPerformanceTest: collection.load() connections.disconnect(alias="default") return collection - def get_pod_ips(self, uri): - parsed_uri = urlparse(uri) - hostname = parsed_uri.hostname - try: - answers = dns.resolver.resolve(hostname, 'A') - return [str(rdata) for rdata in answers] - except dns.resolver.NXDOMAIN: - logger.warning(f"Could not resolve {hostname}. Using the original IP.") - return [socket.gethostbyname(hostname)] + def get_pod_ips(self, instance, namespace='chaos-testing'): + try: + label_selector = f"app.kubernetes.io/instance={instance},app.kubernetes.io/component=proxy" + pod_list = self.k8s_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector) + + pod_ips = [] + for pod in pod_list.items: + if pod.status.phase == 'Running' and pod.status.pod_ip: + pod_ips.append(pod.status.pod_ip) + logger.info(f"Found {len(pod_ips)} pod IPs for instance {instance}: {pod_ips}") + return pod_ips + except Exception as e: + logger.error(f"Error getting pod IPs: {str(e)}") + return [] def get_next_source_ip(self): return next(self.source_ip_cycle) - def get_next_target_ip(self): - return next(self.target_ip_cycle) def setup_collection(self, alias): fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), @@ -81,8 +87,7 @@ class MilvusCDCPerformanceTest: def continuous_insert(self, duration, batch_size, thread_id): alias = f"source_{thread_id}" pod_ip = self.get_next_source_ip() - parsed_uri = urlparse(self.source_uri) - uri = parsed_uri._replace(netloc=f"{pod_ip}:{parsed_uri.port}").geturl() + uri = f"http://{pod_ip}:19530" connections.connect(alias, uri=uri, token=self.source_token) collection = self.setup_collection(alias) end_time = time.time() + duration @@ -197,12 +202,11 @@ if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='cdc perf test') - parser.add_argument('--source_uri', type=str, default='http://127.0.0.1:19530', help='source uri') + parser.add_argument('--source_uri', type=str, default='http://10.104.20.175:19530', help='source uri') parser.add_argument('--source_token', type=str, default='root:Milvus', help='source token') - parser.add_argument('--target_uri', type=str, default='http://127.0.0.1:19530', help='target uri') + parser.add_argument('--target_uri', type=str, default='http://10.104.14.148:19530', help='target uri') parser.add_argument('--target_token', type=str, default='root:Milvus', help='target token') - + parser.add_argument('--source_release_name', type=str, default='cdc-test-upstream-12', help='source release name') args = parser.parse_args() - - cdc_test = MilvusCDCPerformanceTest(args.source_uri, args.source_token, args.target_uri, args.target_token) + cdc_test = MilvusCDCPerformanceTest(args.source_uri, args.source_token, args.target_uri, args.target_token, args.source_release_name) cdc_test.run_all_tests(duration=300, batch_size=1000, max_concurrency=100)