From 052d79a58da5fc91b1d36089947634c7d7528e2c Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Tue, 17 Sep 2019 14:28:34 +0800 Subject: [PATCH] (feat): update connections --- connections.py | 105 ++++++++++++++++++++++++++++++++++++++++++--- exception_codes.py | 1 + exceptions.py | 3 ++ service_handler.py | 11 +++++ settings.py | 1 + utils/__init__.py | 10 +++++ 6 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 service_handler.py diff --git a/connections.py b/connections.py index ea446d5ad3..c52a1c5f85 100644 --- a/connections.py +++ b/connections.py @@ -1,9 +1,12 @@ import logging -from milvus import Milvus +import threading from functools import wraps from contextlib import contextmanager +from milvus import Milvus +import settings import exceptions +from utils import singleton logger = logging.getLogger(__name__) @@ -16,6 +19,7 @@ class Connection: self.conn = Milvus() self.error_handlers = [] if not error_handlers else error_handlers self.on_retry_func = kwargs.get('on_retry_func', None) + self._connect() def __str__(self): return 'Connection:name=\"{}\";uri=\"{}\"'.format(self.name, self.uri) @@ -67,6 +71,79 @@ class Connection: raise e return inner +@singleton +class ConnectionMgr: + def __init__(self): + self.metas = {} + self.conns = {} + + def conn(self, name, throw=False): + c = self.conns.get(name, None) + if not c: + url = self.metas.get(name, None) + if not url: + if not throw: + return None + raise exceptions.ConnectionNotFoundError('Connection {} not found'.format(name)) + this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY) + threaded = { + threading.get_ident() : this_conn + } + c[name] = threaded + return this_conn + + tid = threading.get_ident() + rconn = c.get(tid, None) + if not rconn: + url = self.metas.get(name, None) + if not url: + if not throw: + return None + raise exceptions.ConnectionNotFoundError('Connection {} not found'.format(name)) + this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY) + c[tid] = this_conn + return this_conn + + return rconn + + def on_new_meta(self, name, url): + self.metas[name] = url + + def on_duplicate_meta(self, name, url): + if self.metas[name] == url: + return self.on_same_meta(name, url) + + return self.on_diff_meta(name, url) + + def on_same_meta(self, name, url): + logger.warn('Register same meta: {}:{}'.format(name, url)) + + def on_diff_meta(self, name, url): + logger.warn('Received {} with diff url={}'.format(name, url)) + self.metas[name] = url + self.conns[name] = {} + + def on_unregister_meta(self, name, url): + logger.info('Unregister name={};url={}'.format(name, url)) + self.conns.pop(name, None) + + def on_nonexisted_meta(self, name): + logger.warn('Non-existed meta: {}'.format(name)) + + def register(self, name, url): + meta = self.metas.get(name) + if not meta: + return self.on_new_meta(name, url) + else: + return self.on_duplicate_meta(name, url) + + def unregister(self, name): + url = self.metas.pop(name, None) + if url is None: + return self.on_nonexisted_meta(name) + return self.on_unregister_meta(name, url) + + if __name__ == '__main__': class Conn: def __init__(self, state): @@ -91,15 +168,33 @@ if __name__ == '__main__': retry_obj = Retry() - c = Connection('client', uri='localhost', on_retry_func=retry_obj) - c.conn = fail_conn + c = Connection('client', uri='', on_retry_func=retry_obj) def f(): print('ffffffff') - m = c.connect(func=f) - m() + # c.conn = fail_conn + # m = c.connect(func=f) + # m() c.conn = success_conn m = c.connect(func=f) m() + + mgr = ConnectionMgr() + mgr.register('pod1', '111') + mgr.register('pod2', '222') + mgr.register('pod2', '222') + mgr.register('pod2', 'tcp://127.0.0.1:19530') + + pod3 = mgr.conn('pod3') + print(pod3) + + pod2 = mgr.conn('pod2') + print(pod2) + print(pod2.connected) + + mgr.unregister('pod1') + + logger.info(mgr.metas) + logger.info(mgr.conns) diff --git a/exception_codes.py b/exception_codes.py index 5369389e84..c8cfd81dab 100644 --- a/exception_codes.py +++ b/exception_codes.py @@ -1,3 +1,4 @@ INVALID_CODE = -1 CONNECT_ERROR_CODE = 10001 +CONNECTTION_NOT_FOUND_CODE = 10002 diff --git a/exceptions.py b/exceptions.py index 50db4474c4..a25fb2c4ae 100644 --- a/exceptions.py +++ b/exceptions.py @@ -8,3 +8,6 @@ class BaseException(Exception): class ConnectionConnectError(BaseException): code = codes.CONNECT_ERROR_CODE + +class ConnectionNotFoundError(BaseException): + code = codes.CONNECTTION_NOT_FOUND_CODE diff --git a/service_handler.py b/service_handler.py new file mode 100644 index 0000000000..d5018a54d8 --- /dev/null +++ b/service_handler.py @@ -0,0 +1,11 @@ +import logging + +import grpco +from milvus.grpc_gen import milvus_pb2, milvus_pb2_grpc, status_pb2 + +logger = logging.getLogger(__name__) + + +class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): + def __init__(self, connections, *args, **kwargs): + self.connections = self.connections diff --git a/settings.py b/settings.py index e1a45262c8..4ad00e66cb 100644 --- a/settings.py +++ b/settings.py @@ -20,6 +20,7 @@ from utils.logger_helper import config config(LOG_LEVEL, LOG_PATH, LOG_NAME, TIMEZONE) TIMEOUT = env.int('TIMEOUT', 60) +MAX_RETRY = env.int('MAX_RETRY', 3) if __name__ == '__main__': diff --git a/utils/__init__.py b/utils/__init__.py index e69de29bb2..ec7f32bcbc 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -0,0 +1,10 @@ +from functools import wraps + +def singleton(cls): + instances = {} + @wraps(cls) + def getinstance(*args, **kw): + if cls not in instances: + instances[cls] = cls(*args, **kw) + return instances[cls] + return getinstance