mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-06 02:42:53 +08:00
implement service discovery plugins
This commit is contained in:
parent
83818546db
commit
1d39ec75b0
37
shards/discovery/__init__.py
Normal file
37
shards/discovery/__init__.py
Normal file
@ -0,0 +1,37 @@
|
||||
import os
|
||||
import os
|
||||
import sys
|
||||
if __name__ == '__main__':
|
||||
sys.path.append(os.path.dirname(os.path.dirname(
|
||||
os.path.abspath(__file__))))
|
||||
|
||||
import logging
|
||||
from utils import dotdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DiscoveryConfig(dotdict):
|
||||
CONFIG_PREFIX = 'DISCOVERY_'
|
||||
|
||||
def dump(self):
|
||||
logger.info('----------- DiscoveryConfig -----------------')
|
||||
for k, v in self.items():
|
||||
logger.info('{}: {}'.format(k, v))
|
||||
if len(self) <= 0:
|
||||
logger.error(' Empty DiscoveryConfig Found! ')
|
||||
logger.info('---------------------------------------------')
|
||||
|
||||
@classmethod
|
||||
def Create(cls, **kwargs):
|
||||
o = cls()
|
||||
|
||||
for k, v in os.environ.items():
|
||||
if not k.startswith(cls.CONFIG_PREFIX):
|
||||
continue
|
||||
o[k] = v
|
||||
for k, v in kwargs.items():
|
||||
o[k] = v
|
||||
|
||||
o.dump()
|
||||
return o
|
||||
53
shards/discovery/factory.py
Normal file
53
shards/discovery/factory.py
Normal file
@ -0,0 +1,53 @@
|
||||
import os
|
||||
import logging
|
||||
from functools import partial
|
||||
from utils.pluginextension import MiPluginBase as PluginBase
|
||||
from discovery import DiscoveryConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
here = os.path.abspath(os.path.dirname(__file__))
|
||||
get_path = partial(os.path.join, here)
|
||||
|
||||
PLUGIN_PACKAGE_NAME = 'discovery.plugins'
|
||||
plugin_base = PluginBase(package=PLUGIN_PACKAGE_NAME,
|
||||
searchpath=[get_path('./plugins')])
|
||||
|
||||
|
||||
class DiscoveryFactory(object):
|
||||
PLUGIN_TYPE = 'Discovery'
|
||||
|
||||
def __init__(self, searchpath=None):
|
||||
self.plugin_package_name = PLUGIN_PACKAGE_NAME
|
||||
self.class_map = {}
|
||||
searchpath = searchpath if searchpath else []
|
||||
searchpath = [searchpath] if isinstance(searchpath, str) else searchpath
|
||||
self.source = plugin_base.make_plugin_source(searchpath=searchpath,
|
||||
identifier=self.__class__.__name__)
|
||||
|
||||
for plugin_name in self.source.list_plugins():
|
||||
plugin = self.source.load_plugin(plugin_name)
|
||||
plugin.setup(self)
|
||||
|
||||
def on_plugin_setup(self, plugin_class):
|
||||
name = getattr(plugin_class, 'name', plugin_class.__name__)
|
||||
self.class_map[name.lower()] = plugin_class
|
||||
|
||||
def plugin(self, name):
|
||||
return self.class_map.get(name, None)
|
||||
|
||||
def create(self, class_name, **kwargs):
|
||||
conn_mgr = kwargs.pop('conn_mgr', None)
|
||||
if not conn_mgr:
|
||||
raise RuntimeError('Please pass conn_mgr to create discovery!')
|
||||
|
||||
if not class_name:
|
||||
raise RuntimeError('Please specify \'{}\' class_name first!'.format(self.PLUGIN_TYPE))
|
||||
|
||||
plugin_class = self.plugin(class_name.lower())
|
||||
if not plugin_class:
|
||||
raise RuntimeError('{} Plugin \'{}\' Not Installed!'.format(self.PLUGIN_TYPE, class_name))
|
||||
|
||||
plugin_config = DiscoveryConfig.Create()
|
||||
plugin = plugin_class.create(plugin_config=plugin_config, conn_mgr=conn_mgr, **kwargs)
|
||||
return plugin
|
||||
@ -13,9 +13,6 @@ import queue
|
||||
import enum
|
||||
from kubernetes import client, config, watch
|
||||
|
||||
from utils import singleton
|
||||
from sd import ProviderManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
|
||||
@ -42,6 +39,8 @@ class K8SMixin:
|
||||
|
||||
|
||||
class K8SHeartbeatHandler(threading.Thread, K8SMixin):
|
||||
name = 'kubernetes'
|
||||
|
||||
def __init__(self,
|
||||
message_queue,
|
||||
namespace,
|
||||
@ -235,18 +234,19 @@ class KubernetesProviderSettings:
|
||||
self.port = int(port) if port else 19530
|
||||
|
||||
|
||||
@singleton
|
||||
@ProviderManager.register_service_provider
|
||||
class KubernetesProvider(object):
|
||||
NAME = 'Kubernetes'
|
||||
name = 'kubernetes'
|
||||
|
||||
def __init__(self, settings, conn_mgr, **kwargs):
|
||||
self.namespace = settings.namespace
|
||||
self.pod_patt = settings.pod_patt
|
||||
self.label_selector = settings.label_selector
|
||||
self.in_cluster = settings.in_cluster
|
||||
self.poll_interval = settings.poll_interval
|
||||
self.port = settings.port
|
||||
def __init__(self, plugin_config, conn_mgr, **kwargs):
|
||||
self.namespace = plugin_config.DISCOVERY_KUBERNETES_NAMESPACE
|
||||
self.pod_patt = plugin_config.DISCOVERY_KUBERNETES_POD_PATT
|
||||
self.label_selector = plugin_config.DISCOVERY_KUBERNETES_LABEL_SELECTOR
|
||||
self.in_cluster = plugin_config.DISCOVERY_KUBERNETES_IN_CLUSTER.lower()
|
||||
self.in_cluster = self.in_cluster == 'true'
|
||||
self.poll_interval = plugin_config.DISCOVERY_KUBERNETES_POLL_INTERVAL
|
||||
self.poll_interval = int(self.poll_interval) if self.poll_interval else 5
|
||||
self.port = plugin_config.DISCOVERY_KUBERNETES_PORT
|
||||
self.port = int(self.port) if self.port else 19530
|
||||
self.kwargs = kwargs
|
||||
self.queue = queue.Queue()
|
||||
|
||||
@ -298,9 +298,23 @@ class KubernetesProvider(object):
|
||||
self.pod_heartbeater.stop()
|
||||
self.event_handler.stop()
|
||||
|
||||
@classmethod
|
||||
def create(cls, conn_mgr, plugin_config, **kwargs):
|
||||
discovery = cls(plugin_config=plugin_config, conn_mgr=conn_mgr, **kwargs)
|
||||
return discovery
|
||||
|
||||
|
||||
def setup(app):
|
||||
logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name))
|
||||
app.on_plugin_setup(KubernetesProvider)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(
|
||||
os.path.abspath(__file__))))))
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(
|
||||
os.path.abspath(__file__)))))
|
||||
|
||||
class Connect:
|
||||
def register(self, name, value):
|
||||
@ -315,14 +329,15 @@ if __name__ == '__main__':
|
||||
|
||||
connect_mgr = Connect()
|
||||
|
||||
settings = KubernetesProviderSettings(namespace='xp',
|
||||
pod_patt=".*-ro-servers-.*",
|
||||
label_selector='tier=ro-servers',
|
||||
poll_interval=5,
|
||||
in_cluster=False)
|
||||
from discovery import DiscoveryConfig
|
||||
settings = DiscoveryConfig(DISCOVERY_KUBERNETES_NAMESPACE='xp',
|
||||
DISCOVERY_KUBERNETES_POD_PATT=".*-ro-servers-.*",
|
||||
DISCOVERY_KUBERNETES_LABEL_SELECTOR='tier=ro-servers',
|
||||
DISCOVERY_KUBERNETES_POLL_INTERVAL=5,
|
||||
DISCOVERY_KUBERNETES_IN_CLUSTER=False)
|
||||
|
||||
provider_class = ProviderManager.get_provider('Kubernetes')
|
||||
t = provider_class(conn_mgr=connect_mgr, settings=settings)
|
||||
provider_class = KubernetesProvider
|
||||
t = provider_class(conn_mgr=connect_mgr, plugin_config=settings)
|
||||
t.start()
|
||||
cnt = 100
|
||||
while cnt > 0:
|
||||
43
shards/discovery/plugins/static_provider.py
Normal file
43
shards/discovery/plugins/static_provider.py
Normal file
@ -0,0 +1,43 @@
|
||||
import os
|
||||
import sys
|
||||
if __name__ == '__main__':
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StaticDiscovery(object):
|
||||
name = 'static'
|
||||
|
||||
def __init__(self, config, conn_mgr, **kwargs):
|
||||
self.conn_mgr = conn_mgr
|
||||
hosts = [config.DISCOVERY_STATIC_HOSTS] if isinstance(config.DISCOVERY_STATIC_HOSTS, str) else hosts
|
||||
self.hosts = [socket.gethostbyname(host) for host in hosts]
|
||||
self.port = config.DISCOVERY_STATIC_PORT
|
||||
|
||||
def start(self):
|
||||
for host in self.hosts:
|
||||
self.add_pod(host, host)
|
||||
|
||||
def stop(self):
|
||||
for host in self.hosts:
|
||||
self.delete_pod(host)
|
||||
|
||||
def add_pod(self, name, ip):
|
||||
self.conn_mgr.register(name, 'tcp://{}:{}'.format(ip, self.port))
|
||||
|
||||
def delete_pod(self, name):
|
||||
self.conn_mgr.unregister(name)
|
||||
|
||||
@classmethod
|
||||
def create(cls, conn_mgr, plugin_config, **kwargs):
|
||||
discovery = cls(config=plugin_config, conn_mgr=conn_mgr, **kwargs)
|
||||
return discovery
|
||||
|
||||
|
||||
def setup(app):
|
||||
logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name))
|
||||
app.on_plugin_setup(StaticDiscovery)
|
||||
@ -16,11 +16,9 @@ def create_app(testing_config=None):
|
||||
from mishards.connections import ConnectionMgr
|
||||
connect_mgr = ConnectionMgr()
|
||||
|
||||
from sd import ProviderManager
|
||||
|
||||
sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER)
|
||||
discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS,
|
||||
conn_mgr=connect_mgr)
|
||||
from discovery.factory import DiscoveryFactory
|
||||
discover = DiscoveryFactory(config.DISCOVERY_PLUGIN_PATH).create(config.DISCOVERY_CLASS_NAME,
|
||||
conn_mgr=connect_mgr)
|
||||
|
||||
from mishards.grpc_utils import GrpcSpanDecorator
|
||||
from tracer.factory import TracerFactory
|
||||
|
||||
@ -11,6 +11,7 @@ if FROM_EXAMPLE:
|
||||
else:
|
||||
env.read_env()
|
||||
|
||||
|
||||
DEBUG = env.bool('DEBUG', False)
|
||||
|
||||
LOG_LEVEL = env.str('LOG_LEVEL', 'DEBUG' if DEBUG else 'INFO')
|
||||
@ -28,22 +29,8 @@ SERVER_PORT = env.int('SERVER_PORT', 19530)
|
||||
SERVER_TEST_PORT = env.int('SERVER_TEST_PORT', 19530)
|
||||
WOSERVER = env.str('WOSERVER')
|
||||
|
||||
SD_PROVIDER_SETTINGS = None
|
||||
SD_PROVIDER = env.str('SD_PROVIDER', 'Kubernetes')
|
||||
if SD_PROVIDER == 'Kubernetes':
|
||||
from sd.kubernetes_provider import KubernetesProviderSettings
|
||||
SD_PROVIDER_SETTINGS = KubernetesProviderSettings(
|
||||
namespace=env.str('SD_NAMESPACE', ''),
|
||||
in_cluster=env.bool('SD_IN_CLUSTER', False),
|
||||
poll_interval=env.int('SD_POLL_INTERVAL', 5),
|
||||
pod_patt=env.str('SD_ROSERVER_POD_PATT', ''),
|
||||
label_selector=env.str('SD_LABEL_SELECTOR', ''),
|
||||
port=env.int('SD_PORT', 19530))
|
||||
elif SD_PROVIDER == 'Static':
|
||||
from sd.static_provider import StaticProviderSettings
|
||||
SD_PROVIDER_SETTINGS = StaticProviderSettings(
|
||||
hosts=env.list('SD_STATIC_HOSTS', []),
|
||||
port=env.int('SD_STATIC_PORT', 19530))
|
||||
DISCOVERY_STATIC_HOSTS = env.list('DISCOVERY_STATIC_HOSTS', [])
|
||||
DISCOVERY_STATIC_PORT = env.int('DISCOVERY_STATIC_PORT', 19530)
|
||||
|
||||
# TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530')
|
||||
|
||||
@ -78,6 +65,8 @@ class DefaultConfig:
|
||||
TRACING_TYPE = env.str('TRACING_TYPE', '')
|
||||
ROUTER_PLUGIN_PATH = env.str('ROUTER_PLUGIN_PATH', '')
|
||||
ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_NAME', 'FileBasedHashRingRouter')
|
||||
DISCOVERY_PLUGIN_PATH = env.str('DISCOVERY_PLUGIN_PATH', '')
|
||||
DISCOVERY_CLASS_NAME = env.str('DISCOVERY_CLASS_NAME', 'static')
|
||||
|
||||
|
||||
class TestingConfig(DefaultConfig):
|
||||
|
||||
@ -1,28 +0,0 @@
|
||||
import logging
|
||||
import inspect
|
||||
# from utils import singleton
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProviderManager:
|
||||
PROVIDERS = {}
|
||||
|
||||
@classmethod
|
||||
def register_service_provider(cls, target):
|
||||
if inspect.isfunction(target):
|
||||
cls.PROVIDERS[target.__name__] = target
|
||||
elif inspect.isclass(target):
|
||||
name = target.__dict__.get('NAME', None)
|
||||
name = name if name else target.__class__.__name__
|
||||
cls.PROVIDERS[name] = target
|
||||
else:
|
||||
assert False, 'Cannot register_service_provider for: {}'.format(target)
|
||||
return target
|
||||
|
||||
@classmethod
|
||||
def get_provider(cls, name):
|
||||
return cls.PROVIDERS.get(name, None)
|
||||
|
||||
|
||||
from sd import kubernetes_provider, static_provider
|
||||
@ -1,39 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
if __name__ == '__main__':
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import socket
|
||||
from utils import singleton
|
||||
from sd import ProviderManager
|
||||
|
||||
|
||||
class StaticProviderSettings:
|
||||
def __init__(self, hosts, port=None):
|
||||
self.hosts = hosts
|
||||
self.port = int(port) if port else 19530
|
||||
|
||||
|
||||
@singleton
|
||||
@ProviderManager.register_service_provider
|
||||
class KubernetesProvider(object):
|
||||
NAME = 'Static'
|
||||
|
||||
def __init__(self, settings, conn_mgr, **kwargs):
|
||||
self.conn_mgr = conn_mgr
|
||||
self.hosts = [socket.gethostbyname(host) for host in settings.hosts]
|
||||
self.port = settings.port
|
||||
|
||||
def start(self):
|
||||
for host in self.hosts:
|
||||
self.add_pod(host, host)
|
||||
|
||||
def stop(self):
|
||||
for host in self.hosts:
|
||||
self.delete_pod(host)
|
||||
|
||||
def add_pod(self, name, ip):
|
||||
self.conn_mgr.register(name, 'tcp://{}:{}'.format(ip, self.port))
|
||||
|
||||
def delete_pod(self, name):
|
||||
self.conn_mgr.unregister(name)
|
||||
@ -9,3 +9,10 @@ def singleton(cls):
|
||||
instances[cls] = cls(*args, **kw)
|
||||
return instances[cls]
|
||||
return getinstance
|
||||
|
||||
|
||||
class dotdict(dict):
|
||||
"""dot.notation access to dictionary attributes"""
|
||||
__getattr__ = dict.get
|
||||
__setattr__ = dict.__setitem__
|
||||
__delattr__ = dict.__delitem__
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user