mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
refactor kubernetes service provider
This commit is contained in:
parent
8569309644
commit
b4ed4b2e35
@ -7,13 +7,10 @@ db.init_db(uri=settings.SQLALCHEMY_DATABASE_URI, echo=settings.SQL_ECHO)
|
||||
from mishards.connections import ConnectionMgr
|
||||
connect_mgr = ConnectionMgr()
|
||||
|
||||
from sd.service_founder import ServiceFounder
|
||||
discover = ServiceFounder(namespace=settings.SD_NAMESPACE,
|
||||
conn_mgr=connect_mgr,
|
||||
pod_patt=settings.SD_ROSERVER_POD_PATT,
|
||||
label_selector=settings.SD_LABEL_SELECTOR,
|
||||
in_cluster=settings.SD_IN_CLUSTER,
|
||||
poll_interval=settings.SD_POLL_INTERVAL)
|
||||
from sd import ProviderManager
|
||||
|
||||
sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER)
|
||||
discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr)
|
||||
|
||||
from mishards.server import Server
|
||||
grpc_server = Server(conn_mgr=connect_mgr)
|
||||
|
||||
@ -26,11 +26,17 @@ SEARCH_WORKER_SIZE = env.int('SEARCH_WORKER_SIZE', 10)
|
||||
SERVER_PORT = env.int('SERVER_PORT', 19530)
|
||||
WOSERVER = env.str('WOSERVER')
|
||||
|
||||
SD_NAMESPACE = env.str('SD_NAMESPACE', '')
|
||||
SD_IN_CLUSTER = env.bool('SD_IN_CLUSTER', False)
|
||||
SD_POLL_INTERVAL = env.int('SD_POLL_INTERVAL', 5)
|
||||
SD_ROSERVER_POD_PATT = env.str('SD_ROSERVER_POD_PATT', '')
|
||||
SD_LABEL_SELECTOR = env.str('SD_LABEL_SELECTOR', '')
|
||||
SD_PROVIDER_SETTINGS = None
|
||||
SD_PROVIDER = env.str('SD_PROVIDER', 'Kubernetes')
|
||||
if SD_PROVIDER == 'Kubernetes':
|
||||
from sd.kubernetes_provider import KubernetesProviderSettings
|
||||
SD_PROVIDER_SETTINGS = KubernetesProviderSettings(
|
||||
namespace=env.str('SD_NAMESPACE', ''),
|
||||
in_cluster=env.bool('SD_IN_CLUSTER', False),
|
||||
poll_interval=env.int('SD_POLL_INTERVAL', 5),
|
||||
pod_patt=env.str('SD_ROSERVER_POD_PATT', ''),
|
||||
label_selector=env.str('SD_LABEL_SELECTOR', '')
|
||||
)
|
||||
|
||||
TESTING = env.bool('TESTING', False)
|
||||
TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530')
|
||||
|
||||
@ -0,0 +1,27 @@
|
||||
import logging
|
||||
import inspect
|
||||
# from utils import singleton
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProviderManager:
|
||||
PROVIDERS = {}
|
||||
|
||||
@classmethod
|
||||
def register_service_provider(cls, target):
|
||||
if inspect.isfunction(target):
|
||||
cls.PROVIDERS[target.__name__] = target
|
||||
elif inspect.isclass(target):
|
||||
name = target.__dict__.get('NAME', None)
|
||||
name = name if name else target.__class__.__name__
|
||||
cls.PROVIDERS[name] = target
|
||||
else:
|
||||
assert False, 'Cannot register_service_provider for: {}'.format(target)
|
||||
return target
|
||||
|
||||
@classmethod
|
||||
def get_provider(cls, name):
|
||||
return cls.PROVIDERS.get(name, None)
|
||||
|
||||
from sd import kubernetes_provider
|
||||
@ -12,6 +12,7 @@ from functools import wraps
|
||||
from kubernetes import client, config, watch
|
||||
|
||||
from utils import singleton
|
||||
from sd import ProviderManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -32,7 +33,7 @@ class K8SMixin:
|
||||
self.v1 = client.CoreV1Api()
|
||||
|
||||
|
||||
class K8SServiceDiscover(threading.Thread, K8SMixin):
|
||||
class K8SHeartbeatHandler(threading.Thread, K8SMixin):
|
||||
def __init__(self, message_queue, namespace, label_selector, in_cluster=False, **kwargs):
|
||||
K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs)
|
||||
threading.Thread.__init__(self)
|
||||
@ -202,13 +203,26 @@ class EventHandler(threading.Thread):
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
@singleton
|
||||
class ServiceFounder(object):
|
||||
def __init__(self, conn_mgr, namespace, pod_patt, label_selector, in_cluster=False, **kwargs):
|
||||
class KubernetesProviderSettings:
|
||||
def __init__(self, namespace, pod_patt, label_selector, in_cluster, poll_interval, **kwargs):
|
||||
self.namespace = namespace
|
||||
self.pod_patt = pod_patt
|
||||
self.label_selector = label_selector
|
||||
self.in_cluster = in_cluster
|
||||
self.poll_interval = poll_interval
|
||||
|
||||
@singleton
|
||||
@ProviderManager.register_service_provider
|
||||
class KubernetesProvider(object):
|
||||
NAME = 'Kubernetes'
|
||||
def __init__(self, settings, conn_mgr, **kwargs):
|
||||
self.namespace = settings.namespace
|
||||
self.pod_patt = settings.pod_patt
|
||||
self.label_selector = settings.label_selector
|
||||
self.in_cluster = settings.in_cluster
|
||||
self.poll_interval = settings.poll_interval
|
||||
self.kwargs = kwargs
|
||||
self.queue = queue.Queue()
|
||||
self.in_cluster = in_cluster
|
||||
|
||||
self.conn_mgr = conn_mgr
|
||||
|
||||
@ -226,19 +240,20 @@ class ServiceFounder(object):
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self.pod_heartbeater = K8SServiceDiscover(
|
||||
self.pod_heartbeater = K8SHeartbeatHandler(
|
||||
message_queue=self.queue,
|
||||
namespace=namespace,
|
||||
label_selector=label_selector,
|
||||
namespace=self.namespace,
|
||||
label_selector=self.label_selector,
|
||||
in_cluster=self.in_cluster,
|
||||
v1=self.v1,
|
||||
poll_interval=self.poll_interval,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self.event_handler = EventHandler(mgr=self,
|
||||
message_queue=self.queue,
|
||||
namespace=self.namespace,
|
||||
pod_patt=pod_patt, **kwargs)
|
||||
pod_patt=self.pod_patt, **kwargs)
|
||||
|
||||
def add_pod(self, name, ip):
|
||||
self.conn_mgr.register(name, 'tcp://{}:19530'.format(ip))
|
||||
@ -250,8 +265,6 @@ class ServiceFounder(object):
|
||||
self.listener.daemon = True
|
||||
self.listener.start()
|
||||
self.event_handler.start()
|
||||
# while self.listener.at_start_up:
|
||||
# time.sleep(1)
|
||||
|
||||
self.pod_heartbeater.start()
|
||||
|
||||
@ -262,11 +275,32 @@ class ServiceFounder(object):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from mishards import connect_mgr
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
t = ServiceFounder(namespace='xp', conn_mgr=connect_mgr, pod_patt=".*-ro-servers-.*", label_selector='tier=ro-servers', in_cluster=False)
|
||||
class Connect:
|
||||
def register(self, name, value):
|
||||
logger.error('Register: {} - {}'.format(name, value))
|
||||
def unregister(self, name):
|
||||
logger.error('Unregister: {}'.format(name))
|
||||
|
||||
@property
|
||||
def conn_names(self):
|
||||
return set()
|
||||
|
||||
connect_mgr = Connect()
|
||||
|
||||
settings = KubernetesProviderSettings(
|
||||
namespace='xp',
|
||||
pod_patt=".*-ro-servers-.*",
|
||||
label_selector='tier=ro-servers',
|
||||
poll_interval=5,
|
||||
in_cluster=False)
|
||||
|
||||
provider_class = ProviderManager.get_provider('Kubernetes')
|
||||
t = provider_class(conn_mgr=connect_mgr,
|
||||
settings=settings
|
||||
)
|
||||
t.start()
|
||||
cnt = 2
|
||||
cnt = 100
|
||||
while cnt > 0:
|
||||
time.sleep(2)
|
||||
cnt -= 1
|
||||
Loading…
x
Reference in New Issue
Block a user