update multi proxy

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-08-01 20:25:59 +08:00
parent 5a8863fa67
commit 3cd924762d

View File

@ -4,13 +4,12 @@ import threading
from concurrent.futures import ThreadPoolExecutor
from pymilvus import connections, Collection, DataType, FieldSchema, CollectionSchema
from loguru import logger
import socket
import dns.resolver
from kubernetes import client, config
from urllib.parse import urlparse
import itertools
class MilvusCDCPerformanceTest:
def __init__(self, source_uri, source_token, target_uri, target_token):
def __init__(self, source_uri, source_token, target_uri, target_token, source_release_name):
self.source_uri = source_uri
self.source_token = source_token
self.target_uri = target_uri
@ -23,11 +22,15 @@ class MilvusCDCPerformanceTest:
self.stop_query = False
self.latencies = []
self.latest_insert_status = {"latest_ts": 0, "latest_count": 0}
self.init_collection()
self.source_pod_ips = self.get_pod_ips(source_uri)
self.target_pod_ips = self.get_pod_ips(target_uri)
config.load_kube_config()
self.k8s_api = client.CoreV1Api()
self.source_release_name = source_release_name
self.source_pod_ips = self.get_pod_ips(self.source_release_name)
self.source_ip_cycle = itertools.cycle(self.source_pod_ips)
self.target_ip_cycle = itertools.cycle(self.target_pod_ips)
self.init_collection()
def init_collection(self):
connections.connect(alias="default", uri=self.source_uri, token=self.source_token)
fields = [
@ -46,21 +49,24 @@ class MilvusCDCPerformanceTest:
collection.load()
connections.disconnect(alias="default")
return collection
def get_pod_ips(self, uri):
parsed_uri = urlparse(uri)
hostname = parsed_uri.hostname
try:
answers = dns.resolver.resolve(hostname, 'A')
return [str(rdata) for rdata in answers]
except dns.resolver.NXDOMAIN:
logger.warning(f"Could not resolve {hostname}. Using the original IP.")
return [socket.gethostbyname(hostname)]
def get_pod_ips(self, instance, namespace='chaos-testing'):
try:
label_selector = f"app.kubernetes.io/instance={instance},app.kubernetes.io/component=proxy"
pod_list = self.k8s_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
pod_ips = []
for pod in pod_list.items:
if pod.status.phase == 'Running' and pod.status.pod_ip:
pod_ips.append(pod.status.pod_ip)
logger.info(f"Found {len(pod_ips)} pod IPs for instance {instance}: {pod_ips}")
return pod_ips
except Exception as e:
logger.error(f"Error getting pod IPs: {str(e)}")
return []
def get_next_source_ip(self):
return next(self.source_ip_cycle)
def get_next_target_ip(self):
return next(self.target_ip_cycle)
def setup_collection(self, alias):
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
@ -81,8 +87,7 @@ class MilvusCDCPerformanceTest:
def continuous_insert(self, duration, batch_size, thread_id):
alias = f"source_{thread_id}"
pod_ip = self.get_next_source_ip()
parsed_uri = urlparse(self.source_uri)
uri = parsed_uri._replace(netloc=f"{pod_ip}:{parsed_uri.port}").geturl()
uri = f"http://{pod_ip}:19530"
connections.connect(alias, uri=uri, token=self.source_token)
collection = self.setup_collection(alias)
end_time = time.time() + duration
@ -197,12 +202,11 @@ if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='cdc perf test')
parser.add_argument('--source_uri', type=str, default='http://127.0.0.1:19530', help='source uri')
parser.add_argument('--source_uri', type=str, default='http://10.104.20.175:19530', help='source uri')
parser.add_argument('--source_token', type=str, default='root:Milvus', help='source token')
parser.add_argument('--target_uri', type=str, default='http://127.0.0.1:19530', help='target uri')
parser.add_argument('--target_uri', type=str, default='http://10.104.14.148:19530', help='target uri')
parser.add_argument('--target_token', type=str, default='root:Milvus', help='target token')
parser.add_argument('--source_release_name', type=str, default='cdc-test-upstream-12', help='source release name')
args = parser.parse_args()
cdc_test = MilvusCDCPerformanceTest(args.source_uri, args.source_token, args.target_uri, args.target_token)
cdc_test = MilvusCDCPerformanceTest(args.source_uri, args.source_token, args.target_uri, args.target_token, args.source_release_name)
cdc_test.run_all_tests(duration=300, batch_size=1000, max_concurrency=100)