From b4ed4b2e35c3119290b29f1539c2cf37aca7cebd Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Sat, 21 Sep 2019 12:17:13 +0800 Subject: [PATCH] refactor kubernetes service provider --- mishards/__init__.py | 11 ++-- mishards/settings.py | 16 +++-- sd/__init__.py | 27 ++++++++ ...vice_founder.py => kubernetes_provider.py} | 62 ++++++++++++++----- 4 files changed, 90 insertions(+), 26 deletions(-) rename sd/{service_founder.py => kubernetes_provider.py} (83%) diff --git a/mishards/__init__.py b/mishards/__init__.py index 3158afa5b3..55b24c082c 100644 --- a/mishards/__init__.py +++ b/mishards/__init__.py @@ -7,13 +7,10 @@ db.init_db(uri=settings.SQLALCHEMY_DATABASE_URI, echo=settings.SQL_ECHO) from mishards.connections import ConnectionMgr connect_mgr = ConnectionMgr() -from sd.service_founder import ServiceFounder -discover = ServiceFounder(namespace=settings.SD_NAMESPACE, - conn_mgr=connect_mgr, - pod_patt=settings.SD_ROSERVER_POD_PATT, - label_selector=settings.SD_LABEL_SELECTOR, - in_cluster=settings.SD_IN_CLUSTER, - poll_interval=settings.SD_POLL_INTERVAL) +from sd import ProviderManager + +sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER) +discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr) from mishards.server import Server grpc_server = Server(conn_mgr=connect_mgr) diff --git a/mishards/settings.py b/mishards/settings.py index f99bd3b3c6..046508f92c 100644 --- a/mishards/settings.py +++ b/mishards/settings.py @@ -26,11 +26,17 @@ SEARCH_WORKER_SIZE = env.int('SEARCH_WORKER_SIZE', 10) SERVER_PORT = env.int('SERVER_PORT', 19530) WOSERVER = env.str('WOSERVER') -SD_NAMESPACE = env.str('SD_NAMESPACE', '') -SD_IN_CLUSTER = env.bool('SD_IN_CLUSTER', False) -SD_POLL_INTERVAL = env.int('SD_POLL_INTERVAL', 5) -SD_ROSERVER_POD_PATT = env.str('SD_ROSERVER_POD_PATT', '') -SD_LABEL_SELECTOR = env.str('SD_LABEL_SELECTOR', '') +SD_PROVIDER_SETTINGS = None +SD_PROVIDER = env.str('SD_PROVIDER', 'Kubernetes') +if SD_PROVIDER == 'Kubernetes': + from sd.kubernetes_provider import KubernetesProviderSettings + SD_PROVIDER_SETTINGS = KubernetesProviderSettings( + namespace=env.str('SD_NAMESPACE', ''), + in_cluster=env.bool('SD_IN_CLUSTER', False), + poll_interval=env.int('SD_POLL_INTERVAL', 5), + pod_patt=env.str('SD_ROSERVER_POD_PATT', ''), + label_selector=env.str('SD_LABEL_SELECTOR', '') + ) TESTING = env.bool('TESTING', False) TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530') diff --git a/sd/__init__.py b/sd/__init__.py index e69de29bb2..5c37bc621b 100644 --- a/sd/__init__.py +++ b/sd/__init__.py @@ -0,0 +1,27 @@ +import logging +import inspect +# from utils import singleton + +logger = logging.getLogger(__name__) + + +class ProviderManager: + PROVIDERS = {} + + @classmethod + def register_service_provider(cls, target): + if inspect.isfunction(target): + cls.PROVIDERS[target.__name__] = target + elif inspect.isclass(target): + name = target.__dict__.get('NAME', None) + name = name if name else target.__class__.__name__ + cls.PROVIDERS[name] = target + else: + assert False, 'Cannot register_service_provider for: {}'.format(target) + return target + + @classmethod + def get_provider(cls, name): + return cls.PROVIDERS.get(name, None) + +from sd import kubernetes_provider diff --git a/sd/service_founder.py b/sd/kubernetes_provider.py similarity index 83% rename from sd/service_founder.py rename to sd/kubernetes_provider.py index 79292d452f..51665a0cb5 100644 --- a/sd/service_founder.py +++ b/sd/kubernetes_provider.py @@ -12,6 +12,7 @@ from functools import wraps from kubernetes import client, config, watch from utils import singleton +from sd import ProviderManager logger = logging.getLogger(__name__) @@ -32,7 +33,7 @@ class K8SMixin: self.v1 = client.CoreV1Api() -class K8SServiceDiscover(threading.Thread, K8SMixin): +class K8SHeartbeatHandler(threading.Thread, K8SMixin): def __init__(self, message_queue, namespace, label_selector, in_cluster=False, **kwargs): K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) threading.Thread.__init__(self) @@ -202,13 +203,26 @@ class EventHandler(threading.Thread): except queue.Empty: continue -@singleton -class ServiceFounder(object): - def __init__(self, conn_mgr, namespace, pod_patt, label_selector, in_cluster=False, **kwargs): +class KubernetesProviderSettings: + def __init__(self, namespace, pod_patt, label_selector, in_cluster, poll_interval, **kwargs): self.namespace = namespace + self.pod_patt = pod_patt + self.label_selector = label_selector + self.in_cluster = in_cluster + self.poll_interval = poll_interval + +@singleton +@ProviderManager.register_service_provider +class KubernetesProvider(object): + NAME = 'Kubernetes' + def __init__(self, settings, conn_mgr, **kwargs): + self.namespace = settings.namespace + self.pod_patt = settings.pod_patt + self.label_selector = settings.label_selector + self.in_cluster = settings.in_cluster + self.poll_interval = settings.poll_interval self.kwargs = kwargs self.queue = queue.Queue() - self.in_cluster = in_cluster self.conn_mgr = conn_mgr @@ -226,19 +240,20 @@ class ServiceFounder(object): **kwargs ) - self.pod_heartbeater = K8SServiceDiscover( + self.pod_heartbeater = K8SHeartbeatHandler( message_queue=self.queue, - namespace=namespace, - label_selector=label_selector, + namespace=self.namespace, + label_selector=self.label_selector, in_cluster=self.in_cluster, v1=self.v1, + poll_interval=self.poll_interval, **kwargs ) self.event_handler = EventHandler(mgr=self, message_queue=self.queue, namespace=self.namespace, - pod_patt=pod_patt, **kwargs) + pod_patt=self.pod_patt, **kwargs) def add_pod(self, name, ip): self.conn_mgr.register(name, 'tcp://{}:19530'.format(ip)) @@ -250,8 +265,6 @@ class ServiceFounder(object): self.listener.daemon = True self.listener.start() self.event_handler.start() - # while self.listener.at_start_up: - # time.sleep(1) self.pod_heartbeater.start() @@ -262,11 +275,32 @@ class ServiceFounder(object): if __name__ == '__main__': - from mishards import connect_mgr logging.basicConfig(level=logging.INFO) - t = ServiceFounder(namespace='xp', conn_mgr=connect_mgr, pod_patt=".*-ro-servers-.*", label_selector='tier=ro-servers', in_cluster=False) + class Connect: + def register(self, name, value): + logger.error('Register: {} - {}'.format(name, value)) + def unregister(self, name): + logger.error('Unregister: {}'.format(name)) + + @property + def conn_names(self): + return set() + + connect_mgr = Connect() + + settings = KubernetesProviderSettings( + namespace='xp', + pod_patt=".*-ro-servers-.*", + label_selector='tier=ro-servers', + poll_interval=5, + in_cluster=False) + + provider_class = ProviderManager.get_provider('Kubernetes') + t = provider_class(conn_mgr=connect_mgr, + settings=settings + ) t.start() - cnt = 2 + cnt = 100 while cnt > 0: time.sleep(2) cnt -= 1