mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Remove whole class ConnectionMgr (#2048)
* Update __init__.py: Remove unused ConnectionMgr Signed-off-by: Cupchen <class_cyl@163.com> * Remove unused ConnectionMgr Signed-off-by: Cupchen <class_cyl@163.com> * Remove unused ConnectionMgr Signed-off-by: Cupchen <class_cyl@163.com> * Remove whole class ConnectionMgr Signed-off-by: Cupchen <class_cyl@163.com>
This commit is contained in:
parent
b23beebbfd
commit
889681f63a
@ -279,96 +279,3 @@ class ConnectionTopology(topology.Topology):
|
||||
if status == topology.StatusType.DUPLICATED:
|
||||
group = None
|
||||
return status, group
|
||||
|
||||
|
||||
@singleton
|
||||
class ConnectionMgr:
|
||||
def __init__(self):
|
||||
self.metas = {}
|
||||
self.conns = {}
|
||||
|
||||
@property
|
||||
def conn_names(self):
|
||||
return set(self.metas.keys()) - set(['WOSERVER'])
|
||||
|
||||
def conn(self, name, metadata, throw=False):
|
||||
c = self.conns.get(name, None)
|
||||
if not c:
|
||||
url = self.metas.get(name, None)
|
||||
if not url:
|
||||
if not throw:
|
||||
return None
|
||||
raise exceptions.ConnectionNotFoundError(message='Connection {} not found'.format(name),
|
||||
metadata=metadata)
|
||||
this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY)
|
||||
threaded = {
|
||||
threading.get_ident(): this_conn
|
||||
}
|
||||
self.conns[name] = threaded
|
||||
return this_conn
|
||||
|
||||
tid = threading.get_ident()
|
||||
rconn = c.get(tid, None)
|
||||
if not rconn:
|
||||
url = self.metas.get(name, None)
|
||||
if not url:
|
||||
if not throw:
|
||||
return None
|
||||
raise exceptions.ConnectionNotFoundError('Connection {} not found'.format(name),
|
||||
metadata=metadata)
|
||||
this_conn = Connection(name=name, uri=url, max_retry=settings.MAX_RETRY)
|
||||
c[tid] = this_conn
|
||||
return this_conn
|
||||
|
||||
return rconn
|
||||
|
||||
def on_new_meta(self, name, url):
|
||||
logger.info('Register Connection: name={};url={}'.format(name, url))
|
||||
self.metas[name] = url
|
||||
conn = self.conn(name, metadata=None)
|
||||
conn.on_connect(metadata=None)
|
||||
status, _ = conn.conn.server_version()
|
||||
if not status.OK():
|
||||
logger.error('Cannot connect to newly added address: {}. Remove it now'.format(name))
|
||||
self.unregister(name)
|
||||
return False
|
||||
return True
|
||||
|
||||
def on_duplicate_meta(self, name, url):
|
||||
if self.metas[name] == url:
|
||||
return self.on_same_meta(name, url)
|
||||
|
||||
return self.on_diff_meta(name, url)
|
||||
|
||||
def on_same_meta(self, name, url):
|
||||
# logger.warning('Register same meta: {}:{}'.format(name, url))
|
||||
return True
|
||||
|
||||
def on_diff_meta(self, name, url):
|
||||
logger.warning('Received {} with diff url={}'.format(name, url))
|
||||
self.metas[name] = url
|
||||
self.conns[name] = {}
|
||||
return True
|
||||
|
||||
def on_unregister_meta(self, name, url):
|
||||
logger.info('Unregister name={};url={}'.format(name, url))
|
||||
self.conns.pop(name, None)
|
||||
return True
|
||||
|
||||
def on_nonexisted_meta(self, name):
|
||||
logger.warning('Non-existed meta: {}'.format(name))
|
||||
return False
|
||||
|
||||
def register(self, name, url):
|
||||
meta = self.metas.get(name)
|
||||
if not meta:
|
||||
return self.on_new_meta(name, url)
|
||||
else:
|
||||
return self.on_duplicate_meta(name, url)
|
||||
|
||||
def unregister(self, name):
|
||||
logger.info('Unregister Connection: name={}'.format(name))
|
||||
url = self.metas.pop(name, None)
|
||||
if url is None:
|
||||
return self.on_nonexisted_meta(name)
|
||||
return self.on_unregister_meta(name, url)
|
||||
|
||||
@ -5,7 +5,7 @@ import random
|
||||
import threading
|
||||
|
||||
from milvus import Milvus
|
||||
from mishards.connections import (ConnectionMgr, Connection,
|
||||
from mishards.connections import (Connection,
|
||||
ConnectionPool, ConnectionTopology, ConnectionGroup)
|
||||
from mishards.topology import StatusType
|
||||
from mishards import exceptions
|
||||
@ -15,31 +15,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.usefixtures('app')
|
||||
class TestConnection:
|
||||
@pytest.mark.skip
|
||||
def test_manager(self):
|
||||
mgr = ConnectionMgr()
|
||||
|
||||
mgr.register('pod1', '111')
|
||||
mgr.register('pod2', '222')
|
||||
mgr.register('pod2', '222')
|
||||
mgr.register('pod2', '2222')
|
||||
assert len(mgr.conn_names) == 2
|
||||
|
||||
mgr.unregister('pod1')
|
||||
assert len(mgr.conn_names) == 1
|
||||
|
||||
mgr.unregister('pod2')
|
||||
assert len(mgr.conn_names) == 0
|
||||
|
||||
mgr.register('WOSERVER', 'xxxx')
|
||||
assert len(mgr.conn_names) == 0
|
||||
|
||||
assert not mgr.conn('XXXX', None)
|
||||
with pytest.raises(exceptions.ConnectionNotFoundError):
|
||||
mgr.conn('XXXX', None, True)
|
||||
|
||||
mgr.conn('WOSERVER', None)
|
||||
|
||||
def test_connection(self):
|
||||
class Conn:
|
||||
def __init__(self, state):
|
||||
|
||||
@ -14,7 +14,7 @@ from mishards.service_handler import ServiceHandler
|
||||
from mishards.grpc_utils.grpc_args_parser import GrpcArgsParser as Parser
|
||||
from mishards.factories import TableFilesFactory, TablesFactory, TableFiles, Tables
|
||||
from mishards.router import RouterMixin
|
||||
from mishards.connections import (ConnectionMgr, Connection,
|
||||
from mishards.connections import (Connection,
|
||||
ConnectionPool, ConnectionTopology, ConnectionGroup)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user