diff --git a/shards/discovery/__init__.py b/shards/discovery/__init__.py new file mode 100644 index 0000000000..a591d1cc1c --- /dev/null +++ b/shards/discovery/__init__.py @@ -0,0 +1,37 @@ +import os +import os +import sys +if __name__ == '__main__': + sys.path.append(os.path.dirname(os.path.dirname( + os.path.abspath(__file__)))) + +import logging +from utils import dotdict + +logger = logging.getLogger(__name__) + + +class DiscoveryConfig(dotdict): + CONFIG_PREFIX = 'DISCOVERY_' + + def dump(self): + logger.info('----------- DiscoveryConfig -----------------') + for k, v in self.items(): + logger.info('{}: {}'.format(k, v)) + if len(self) <= 0: + logger.error(' Empty DiscoveryConfig Found! ') + logger.info('---------------------------------------------') + + @classmethod + def Create(cls, **kwargs): + o = cls() + + for k, v in os.environ.items(): + if not k.startswith(cls.CONFIG_PREFIX): + continue + o[k] = v + for k, v in kwargs.items(): + o[k] = v + + o.dump() + return o diff --git a/shards/discovery/factory.py b/shards/discovery/factory.py new file mode 100644 index 0000000000..a5713dcf37 --- /dev/null +++ b/shards/discovery/factory.py @@ -0,0 +1,53 @@ +import os +import logging +from functools import partial +from utils.pluginextension import MiPluginBase as PluginBase +from discovery import DiscoveryConfig + +logger = logging.getLogger(__name__) + +here = os.path.abspath(os.path.dirname(__file__)) +get_path = partial(os.path.join, here) + +PLUGIN_PACKAGE_NAME = 'discovery.plugins' +plugin_base = PluginBase(package=PLUGIN_PACKAGE_NAME, + searchpath=[get_path('./plugins')]) + + +class DiscoveryFactory(object): + PLUGIN_TYPE = 'Discovery' + + def __init__(self, searchpath=None): + self.plugin_package_name = PLUGIN_PACKAGE_NAME + self.class_map = {} + searchpath = searchpath if searchpath else [] + searchpath = [searchpath] if isinstance(searchpath, str) else searchpath + self.source = plugin_base.make_plugin_source(searchpath=searchpath, + identifier=self.__class__.__name__) + + for plugin_name in self.source.list_plugins(): + plugin = self.source.load_plugin(plugin_name) + plugin.setup(self) + + def on_plugin_setup(self, plugin_class): + name = getattr(plugin_class, 'name', plugin_class.__name__) + self.class_map[name.lower()] = plugin_class + + def plugin(self, name): + return self.class_map.get(name, None) + + def create(self, class_name, **kwargs): + conn_mgr = kwargs.pop('conn_mgr', None) + if not conn_mgr: + raise RuntimeError('Please pass conn_mgr to create discovery!') + + if not class_name: + raise RuntimeError('Please specify \'{}\' class_name first!'.format(self.PLUGIN_TYPE)) + + plugin_class = self.plugin(class_name.lower()) + if not plugin_class: + raise RuntimeError('{} Plugin \'{}\' Not Installed!'.format(self.PLUGIN_TYPE, class_name)) + + plugin_config = DiscoveryConfig.Create() + plugin = plugin_class.create(plugin_config=plugin_config, conn_mgr=conn_mgr, **kwargs) + return plugin diff --git a/shards/sd/kubernetes_provider.py b/shards/discovery/plugins/kubernetes_provider.py similarity index 84% rename from shards/sd/kubernetes_provider.py rename to shards/discovery/plugins/kubernetes_provider.py index eb113db007..c9d9a3ad5a 100644 --- a/shards/sd/kubernetes_provider.py +++ b/shards/discovery/plugins/kubernetes_provider.py @@ -13,9 +13,6 @@ import queue import enum from kubernetes import client, config, watch -from utils import singleton -from sd import ProviderManager - logger = logging.getLogger(__name__) INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' @@ -42,6 +39,8 @@ class K8SMixin: class K8SHeartbeatHandler(threading.Thread, K8SMixin): + name = 'kubernetes' + def __init__(self, message_queue, namespace, @@ -235,18 +234,19 @@ class KubernetesProviderSettings: self.port = int(port) if port else 19530 -@singleton -@ProviderManager.register_service_provider class KubernetesProvider(object): - NAME = 'Kubernetes' + name = 'kubernetes' - def __init__(self, settings, conn_mgr, **kwargs): - self.namespace = settings.namespace - self.pod_patt = settings.pod_patt - self.label_selector = settings.label_selector - self.in_cluster = settings.in_cluster - self.poll_interval = settings.poll_interval - self.port = settings.port + def __init__(self, plugin_config, conn_mgr, **kwargs): + self.namespace = plugin_config.DISCOVERY_KUBERNETES_NAMESPACE + self.pod_patt = plugin_config.DISCOVERY_KUBERNETES_POD_PATT + self.label_selector = plugin_config.DISCOVERY_KUBERNETES_LABEL_SELECTOR + self.in_cluster = plugin_config.DISCOVERY_KUBERNETES_IN_CLUSTER.lower() + self.in_cluster = self.in_cluster == 'true' + self.poll_interval = plugin_config.DISCOVERY_KUBERNETES_POLL_INTERVAL + self.poll_interval = int(self.poll_interval) if self.poll_interval else 5 + self.port = plugin_config.DISCOVERY_KUBERNETES_PORT + self.port = int(self.port) if self.port else 19530 self.kwargs = kwargs self.queue = queue.Queue() @@ -298,9 +298,23 @@ class KubernetesProvider(object): self.pod_heartbeater.stop() self.event_handler.stop() + @classmethod + def create(cls, conn_mgr, plugin_config, **kwargs): + discovery = cls(plugin_config=plugin_config, conn_mgr=conn_mgr, **kwargs) + return discovery + + +def setup(app): + logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name)) + app.on_plugin_setup(KubernetesProvider) + if __name__ == '__main__': logging.basicConfig(level=logging.INFO) + sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname( + os.path.abspath(__file__)))))) + sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname( + os.path.abspath(__file__))))) class Connect: def register(self, name, value): @@ -315,14 +329,15 @@ if __name__ == '__main__': connect_mgr = Connect() - settings = KubernetesProviderSettings(namespace='xp', - pod_patt=".*-ro-servers-.*", - label_selector='tier=ro-servers', - poll_interval=5, - in_cluster=False) + from discovery import DiscoveryConfig + settings = DiscoveryConfig(DISCOVERY_KUBERNETES_NAMESPACE='xp', + DISCOVERY_KUBERNETES_POD_PATT=".*-ro-servers-.*", + DISCOVERY_KUBERNETES_LABEL_SELECTOR='tier=ro-servers', + DISCOVERY_KUBERNETES_POLL_INTERVAL=5, + DISCOVERY_KUBERNETES_IN_CLUSTER=False) - provider_class = ProviderManager.get_provider('Kubernetes') - t = provider_class(conn_mgr=connect_mgr, settings=settings) + provider_class = KubernetesProvider + t = provider_class(conn_mgr=connect_mgr, plugin_config=settings) t.start() cnt = 100 while cnt > 0: diff --git a/shards/discovery/plugins/static_provider.py b/shards/discovery/plugins/static_provider.py new file mode 100644 index 0000000000..0f8bdb3d25 --- /dev/null +++ b/shards/discovery/plugins/static_provider.py @@ -0,0 +1,43 @@ +import os +import sys +if __name__ == '__main__': + sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import logging +import socket + +logger = logging.getLogger(__name__) + + +class StaticDiscovery(object): + name = 'static' + + def __init__(self, config, conn_mgr, **kwargs): + self.conn_mgr = conn_mgr + hosts = [config.DISCOVERY_STATIC_HOSTS] if isinstance(config.DISCOVERY_STATIC_HOSTS, str) else hosts + self.hosts = [socket.gethostbyname(host) for host in hosts] + self.port = config.DISCOVERY_STATIC_PORT + + def start(self): + for host in self.hosts: + self.add_pod(host, host) + + def stop(self): + for host in self.hosts: + self.delete_pod(host) + + def add_pod(self, name, ip): + self.conn_mgr.register(name, 'tcp://{}:{}'.format(ip, self.port)) + + def delete_pod(self, name): + self.conn_mgr.unregister(name) + + @classmethod + def create(cls, conn_mgr, plugin_config, **kwargs): + discovery = cls(config=plugin_config, conn_mgr=conn_mgr, **kwargs) + return discovery + + +def setup(app): + logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name)) + app.on_plugin_setup(StaticDiscovery) diff --git a/shards/mishards/__init__.py b/shards/mishards/__init__.py index 0c5ecd4d0e..e0792348a9 100644 --- a/shards/mishards/__init__.py +++ b/shards/mishards/__init__.py @@ -16,11 +16,9 @@ def create_app(testing_config=None): from mishards.connections import ConnectionMgr connect_mgr = ConnectionMgr() - from sd import ProviderManager - - sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER) - discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, - conn_mgr=connect_mgr) + from discovery.factory import DiscoveryFactory + discover = DiscoveryFactory(config.DISCOVERY_PLUGIN_PATH).create(config.DISCOVERY_CLASS_NAME, + conn_mgr=connect_mgr) from mishards.grpc_utils import GrpcSpanDecorator from tracer.factory import TracerFactory diff --git a/shards/mishards/settings.py b/shards/mishards/settings.py index c08e1d7a06..6935405091 100644 --- a/shards/mishards/settings.py +++ b/shards/mishards/settings.py @@ -11,6 +11,7 @@ if FROM_EXAMPLE: else: env.read_env() + DEBUG = env.bool('DEBUG', False) LOG_LEVEL = env.str('LOG_LEVEL', 'DEBUG' if DEBUG else 'INFO') @@ -28,22 +29,8 @@ SERVER_PORT = env.int('SERVER_PORT', 19530) SERVER_TEST_PORT = env.int('SERVER_TEST_PORT', 19530) WOSERVER = env.str('WOSERVER') -SD_PROVIDER_SETTINGS = None -SD_PROVIDER = env.str('SD_PROVIDER', 'Kubernetes') -if SD_PROVIDER == 'Kubernetes': - from sd.kubernetes_provider import KubernetesProviderSettings - SD_PROVIDER_SETTINGS = KubernetesProviderSettings( - namespace=env.str('SD_NAMESPACE', ''), - in_cluster=env.bool('SD_IN_CLUSTER', False), - poll_interval=env.int('SD_POLL_INTERVAL', 5), - pod_patt=env.str('SD_ROSERVER_POD_PATT', ''), - label_selector=env.str('SD_LABEL_SELECTOR', ''), - port=env.int('SD_PORT', 19530)) -elif SD_PROVIDER == 'Static': - from sd.static_provider import StaticProviderSettings - SD_PROVIDER_SETTINGS = StaticProviderSettings( - hosts=env.list('SD_STATIC_HOSTS', []), - port=env.int('SD_STATIC_PORT', 19530)) +DISCOVERY_STATIC_HOSTS = env.list('DISCOVERY_STATIC_HOSTS', []) +DISCOVERY_STATIC_PORT = env.int('DISCOVERY_STATIC_PORT', 19530) # TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530') @@ -78,6 +65,8 @@ class DefaultConfig: TRACING_TYPE = env.str('TRACING_TYPE', '') ROUTER_PLUGIN_PATH = env.str('ROUTER_PLUGIN_PATH', '') ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_NAME', 'FileBasedHashRingRouter') + DISCOVERY_PLUGIN_PATH = env.str('DISCOVERY_PLUGIN_PATH', '') + DISCOVERY_CLASS_NAME = env.str('DISCOVERY_CLASS_NAME', 'static') class TestingConfig(DefaultConfig): diff --git a/shards/sd/__init__.py b/shards/sd/__init__.py deleted file mode 100644 index 7943887d0f..0000000000 --- a/shards/sd/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging -import inspect -# from utils import singleton - -logger = logging.getLogger(__name__) - - -class ProviderManager: - PROVIDERS = {} - - @classmethod - def register_service_provider(cls, target): - if inspect.isfunction(target): - cls.PROVIDERS[target.__name__] = target - elif inspect.isclass(target): - name = target.__dict__.get('NAME', None) - name = name if name else target.__class__.__name__ - cls.PROVIDERS[name] = target - else: - assert False, 'Cannot register_service_provider for: {}'.format(target) - return target - - @classmethod - def get_provider(cls, name): - return cls.PROVIDERS.get(name, None) - - -from sd import kubernetes_provider, static_provider diff --git a/shards/sd/static_provider.py b/shards/sd/static_provider.py deleted file mode 100644 index e88780740f..0000000000 --- a/shards/sd/static_provider.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -import sys -if __name__ == '__main__': - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -import socket -from utils import singleton -from sd import ProviderManager - - -class StaticProviderSettings: - def __init__(self, hosts, port=None): - self.hosts = hosts - self.port = int(port) if port else 19530 - - -@singleton -@ProviderManager.register_service_provider -class KubernetesProvider(object): - NAME = 'Static' - - def __init__(self, settings, conn_mgr, **kwargs): - self.conn_mgr = conn_mgr - self.hosts = [socket.gethostbyname(host) for host in settings.hosts] - self.port = settings.port - - def start(self): - for host in self.hosts: - self.add_pod(host, host) - - def stop(self): - for host in self.hosts: - self.delete_pod(host) - - def add_pod(self, name, ip): - self.conn_mgr.register(name, 'tcp://{}:{}'.format(ip, self.port)) - - def delete_pod(self, name): - self.conn_mgr.unregister(name) diff --git a/shards/utils/__init__.py b/shards/utils/__init__.py index c1d55e76c0..cf444c0680 100644 --- a/shards/utils/__init__.py +++ b/shards/utils/__init__.py @@ -9,3 +9,10 @@ def singleton(cls): instances[cls] = cls(*args, **kw) return instances[cls] return getinstance + + +class dotdict(dict): + """dot.notation access to dictionary attributes""" + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__