From 8a432bc472d903e7d783d71f84e2d61768813518 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Mon, 14 Oct 2019 15:56:47 +0800 Subject: [PATCH] update k8s provider for sd --- sd/kubernetes_provider.py | 108 ++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 45 deletions(-) diff --git a/sd/kubernetes_provider.py b/sd/kubernetes_provider.py index 8ee1588ec4..9a15b2fa78 100644 --- a/sd/kubernetes_provider.py +++ b/sd/kubernetes_provider.py @@ -1,7 +1,8 @@ import os import sys if __name__ == '__main__': - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + sys.path.append(os.path.dirname(os.path.dirname( + os.path.abspath(__file__)))) import re import logging @@ -9,6 +10,7 @@ import time import copy import threading import queue +import enum from kubernetes import client, config, watch from utils import singleton @@ -19,6 +21,11 @@ logger = logging.getLogger(__name__) INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' +class EventType(enum.Enum): + PodHeartBeat = 1 + Watch = 2 + + class K8SMixin: def __init__(self, namespace, in_cluster=False, **kwargs): self.namespace = namespace @@ -29,13 +36,22 @@ class K8SMixin: self.namespace = open(INCLUSTER_NAMESPACE_PATH).read() if not self.v1: - config.load_incluster_config() if self.in_cluster else config.load_kube_config() + config.load_incluster_config( + ) if self.in_cluster else config.load_kube_config() self.v1 = client.CoreV1Api() class K8SHeartbeatHandler(threading.Thread, K8SMixin): - def __init__(self, message_queue, namespace, label_selector, in_cluster=False, **kwargs): - K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) + def __init__(self, + message_queue, + namespace, + label_selector, + in_cluster=False, + **kwargs): + K8SMixin.__init__(self, + namespace=namespace, + in_cluster=in_cluster, + **kwargs) threading.Thread.__init__(self) self.queue = message_queue self.terminate = False @@ -45,13 +61,13 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin): def run(self): while not self.terminate: try: - pods = self.v1.list_namespaced_pod(namespace=self.namespace, label_selector=self.label_selector) - event_message = { - 'eType': 'PodHeartBeat', - 'events': [] - } + pods = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=self.label_selector) + event_message = {'eType': EventType.PodHeartBeat, 'events': []} for item in pods.items: - pod = self.v1.read_namespaced_pod(name=item.metadata.name, namespace=self.namespace) + pod = self.v1.read_namespaced_pod(name=item.metadata.name, + namespace=self.namespace) name = pod.metadata.name ip = pod.status.pod_ip phase = pod.status.phase @@ -59,13 +75,11 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin): message = pod.status.message ready = True if phase == 'Running' else False - pod_event = dict( - pod=name, - ip=ip, - ready=ready, - reason=reason, - message=message - ) + pod_event = dict(pod=name, + ip=ip, + ready=ready, + reason=reason, + message=message) event_message['events'].append(pod_event) @@ -82,7 +96,10 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin): class K8SEventListener(threading.Thread, K8SMixin): def __init__(self, message_queue, namespace, in_cluster=False, **kwargs): - K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) + K8SMixin.__init__(self, + namespace=namespace, + in_cluster=in_cluster, + **kwargs) threading.Thread.__init__(self) self.queue = message_queue self.terminate = False @@ -96,7 +113,8 @@ class K8SEventListener(threading.Thread, K8SMixin): def run(self): resource_version = '' w = watch.Watch() - for event in w.stream(self.v1.list_namespaced_event, namespace=self.namespace, + for event in w.stream(self.v1.list_namespaced_event, + namespace=self.namespace, field_selector='involvedObject.kind=Pod'): if self.terminate: break @@ -104,7 +122,7 @@ class K8SEventListener(threading.Thread, K8SMixin): resource_version = int(event['object'].metadata.resource_version) info = dict( - eType='WatchEvent', + eType=EventType.Watch, pod=event['object'].involved_object.name, reason=event['object'].reason, message=event['object'].message, @@ -137,7 +155,8 @@ class EventHandler(threading.Thread): while try_cnt > 0: try_cnt -= 1 try: - pod = self.mgr.v1.read_namespaced_pod(name=event['pod'], namespace=self.namespace) + pod = self.mgr.v1.read_namespaced_pod(name=event['pod'], + namespace=self.namespace) if not pod.status.pod_ip: time.sleep(0.5) continue @@ -147,13 +166,15 @@ class EventHandler(threading.Thread): if try_cnt <= 0 and not pod: if not event['start_up']: - logger.error('Pod {} is started but cannot read pod'.format(event['pod'])) + logger.error('Pod {} is started but cannot read pod'.format( + event['pod'])) return elif try_cnt <= 0 and not pod.status.pod_ip: logger.warn('NoPodIPFoundError') return - logger.info('Register POD {} with IP {}'.format(pod.metadata.name, pod.status.pod_ip)) + logger.info('Register POD {} with IP {}'.format( + pod.metadata.name, pod.status.pod_ip)) self.mgr.add_pod(name=pod.metadata.name, ip=pod.status.pod_ip) def on_pod_killing(self, event, **kwargs): @@ -178,7 +199,7 @@ class EventHandler(threading.Thread): logger.info(self.mgr.conn_mgr.conn_names) def handle_event(self, event): - if event['eType'] == 'PodHeartBeat': + if event['eType'] == EventType.PodHeartBeat: return self.on_pod_heartbeat(event) if not event or (event['reason'] not in ('Started', 'Killing')): @@ -204,7 +225,8 @@ class EventHandler(threading.Thread): class KubernetesProviderSettings: - def __init__(self, namespace, pod_patt, label_selector, in_cluster, poll_interval, **kwargs): + def __init__(self, namespace, pod_patt, label_selector, in_cluster, + poll_interval, **kwargs): self.namespace = namespace self.pod_patt = pod_patt self.label_selector = label_selector @@ -231,16 +253,15 @@ class KubernetesProvider(object): if not self.namespace: self.namespace = open(incluster_namespace_path).read() - config.load_incluster_config() if self.in_cluster else config.load_kube_config() + config.load_incluster_config( + ) if self.in_cluster else config.load_kube_config() self.v1 = client.CoreV1Api() - self.listener = K8SEventListener( - message_queue=self.queue, - namespace=self.namespace, - in_cluster=self.in_cluster, - v1=self.v1, - **kwargs - ) + self.listener = K8SEventListener(message_queue=self.queue, + namespace=self.namespace, + in_cluster=self.in_cluster, + v1=self.v1, + **kwargs) self.pod_heartbeater = K8SHeartbeatHandler( message_queue=self.queue, @@ -249,13 +270,13 @@ class KubernetesProvider(object): in_cluster=self.in_cluster, v1=self.v1, poll_interval=self.poll_interval, - **kwargs - ) + **kwargs) self.event_handler = EventHandler(mgr=self, message_queue=self.queue, namespace=self.namespace, - pod_patt=self.pod_patt, **kwargs) + pod_patt=self.pod_patt, + **kwargs) def add_pod(self, name, ip): self.conn_mgr.register(name, 'tcp://{}:19530'.format(ip)) @@ -292,17 +313,14 @@ if __name__ == '__main__': connect_mgr = Connect() - settings = KubernetesProviderSettings( - namespace='xp', - pod_patt=".*-ro-servers-.*", - label_selector='tier=ro-servers', - poll_interval=5, - in_cluster=False) + settings = KubernetesProviderSettings(namespace='xp', + pod_patt=".*-ro-servers-.*", + label_selector='tier=ro-servers', + poll_interval=5, + in_cluster=False) provider_class = ProviderManager.get_provider('Kubernetes') - t = provider_class(conn_mgr=connect_mgr, - settings=settings - ) + t = provider_class(conn_mgr=connect_mgr, settings=settings) t.start() cnt = 100 while cnt > 0: