commit ce23e6c77c2276351224441d05eedc8a8fe8df00 Author: peng.xu Date: Tue Sep 17 12:46:29 2019 +0800 init commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..624eb4fa58 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env + +__pycache__/ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000000..7db5c41bd0 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +import settings diff --git a/connections.py b/connections.py new file mode 100644 index 0000000000..727864ef98 --- /dev/null +++ b/connections.py @@ -0,0 +1,105 @@ +import logging +from milvus import Milvus +from functools import wraps +from contextlib import contextmanager + +import exceptions + +logger = logging.getLogger(__name__) + +class Connection: + def __init__(self, name, uri, max_retry=1, error_handlers=None, **kwargs): + self.name = name + self.uri = uri + self.max_retry = max_retry + self.retried = 0 + self.conn = Milvus() + self.error_handlers = [] if not error_handlers else error_handlers + self.on_retry_func = kwargs.get('on_retry_func', None) + + def __str__(self): + return 'Connection:name=\"{}\";uri=\"{}\"'.format(self.name, self.uri) + + def _connect(self): + try: + self.conn.connect(uri=self.uri) + except Exception as e: + if not self.error_handlers: + raise exceptions.ConnectionConnectError(message='') + for handler in self.error_handlers: + handler(e) + + @property + def can_retry(self): + return self.retried <= self.max_retry + + @property + def connected(self): + return self.conn.connected() + + def on_retry(self): + if self.on_retry_func: + self.on_retry_func(self) + else: + logger.warn('{} is retrying {}'.format(self, self.retried)) + + def on_connect(self): + while not self.connected and self.can_retry: + self.retried += 1 + self.on_retry() + self._connect() + + if not self.can_retry and not self.connected: + raise exceptions.ConnectionConnectError(message='Max retry {} reached!'.format(self.max_retry)) + + self.retried = 0 + + def connect(self, func, exception_handler=None): + @wraps(func) + def inner(*args, **kwargs): + self.on_connect() + try: + return func(*args, **kwargs) + except Exception as e: + if exception_handler: + exception_handler(e) + else: + raise e + return inner + +if __name__ == '__main__': + class Conn: + def __init__(self, state): + self.state = state + + def connect(self, uri): + return self.state + + def connected(self): + return self.state + + fail_conn = Conn(False) + success_conn = Conn(True) + + class Retry: + def __init__(self): + self.times = 0 + + def __call__(self, conn): + self.times += 1 + print('Retrying {}'.format(self.times)) + + + retry_obj = Retry() + c = Connection('client', uri='localhost', on_retry_func=retry_obj) + c.conn = fail_conn + + def f(): + print('ffffffff') + + # m = c.connect(func=f) + # m() + + c.conn = success_conn + m = c.connect(func=f) + m() diff --git a/exception_codes.py b/exception_codes.py new file mode 100644 index 0000000000..5369389e84 --- /dev/null +++ b/exception_codes.py @@ -0,0 +1,3 @@ +INVALID_CODE = -1 + +CONNECT_ERROR_CODE = 10001 diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000000..7178c4ebdc --- /dev/null +++ b/exceptions.py @@ -0,0 +1,11 @@ +import exception_codes as codes + +class BaseException(Exception): + code = codes.INVALID_CODE + message = 'BaseException' + def __init__(self, message='', code=None): + self.message = self.__class__.__name__ if not message else message + self.code = self.code if code is None else code + +class ConnectionConnectError(BaseException): + code = codes.CONNECT_ERROR_CODE diff --git a/settings.py b/settings.py new file mode 100644 index 0000000000..e1a45262c8 --- /dev/null +++ b/settings.py @@ -0,0 +1,31 @@ +import sys +import os + +from environs import Env + +env = Env() +env.read_env() + +DEBUG = env.bool('DEBUG', False) +TESTING = env.bool('TESTING', False) + +METADATA_URI = env.str('METADATA_URI', '') + +LOG_LEVEL = env.str('LOG_LEVEL', 'DEBUG' if DEBUG else 'INFO') +LOG_PATH = env.str('LOG_PATH', '/tmp/mishards') +LOG_NAME = env.str('LOG_NAME', 'logfile') +TIMEZONE = env.str('TIMEZONE', 'UTC') + +from utils.logger_helper import config +config(LOG_LEVEL, LOG_PATH, LOG_NAME, TIMEZONE) + +TIMEOUT = env.int('TIMEOUT', 60) + + +if __name__ == '__main__': + import logging + logger = logging.getLogger(__name__) + logger.debug('DEBUG') + logger.info('INFO') + logger.warn('WARN') + logger.error('ERROR') diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/utils/logger_helper.py b/utils/logger_helper.py new file mode 100644 index 0000000000..1b59aa40ec --- /dev/null +++ b/utils/logger_helper.py @@ -0,0 +1,145 @@ +import os +import datetime +from pytz import timezone +from logging import Filter +import logging.config + + +class InfoFilter(logging.Filter): + def filter(self, rec): + return rec.levelno == logging.INFO + +class DebugFilter(logging.Filter): + def filter(self, rec): + return rec.levelno == logging.DEBUG + +class WarnFilter(logging.Filter): + def filter(self, rec): + return rec.levelno == logging.WARN + +class ErrorFilter(logging.Filter): + def filter(self, rec): + return rec.levelno == logging.ERROR + +class CriticalFilter(logging.Filter): + def filter(self, rec): + return rec.levelno == logging.CRITICAL + + +COLORS = { + 'HEADER': '\033[95m', + 'INFO': '\033[92m', + 'DEBUG': '\033[94m', + 'WARNING': '\033[93m', + 'ERROR': '\033[95m', + 'CRITICAL': '\033[91m', + 'ENDC': '\033[0m', +} + +class ColorFulFormatColMixin: + def format_col(self, message_str, level_name): + if level_name in COLORS.keys(): + message_str = COLORS.get(level_name) + message_str + COLORS.get( + 'ENDC') + return message_str + +class ColorfulFormatter(logging.Formatter, ColorFulFormatColMixin): + def format(self, record): + message_str = super(ColorfulFormatter, self).format(record) + + return self.format_col(message_str, level_name=record.levelname) + +def config(log_level, log_path, name, tz='UTC'): + def build_log_file(level, log_path, name, tz): + utc_now = datetime.datetime.utcnow() + utc_tz = timezone('UTC') + local_tz = timezone(tz) + tznow = utc_now.replace(tzinfo=utc_tz).astimezone(local_tz) + return '{}-{}-{}.log'.format(os.path.join(log_path, name), tznow.strftime("%m-%d-%Y-%H:%M:%S"), + level) + + if not os.path.exists(log_path): + os.makedirs(log_path) + + LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'default': { + 'format': '[%(asctime)s-%(levelname)s-%(name)s]: %(message)s (%(filename)s:%(lineno)s)' + }, + 'colorful_console': { + 'format': '[%(asctime)s-%(levelname)s-%(name)s]: %(message)s (%(filename)s:%(lineno)s)', + '()': ColorfulFormatter, + }, + }, + 'filters': { + 'InfoFilter': { + '()': InfoFilter, + }, + 'DebugFilter': { + '()': DebugFilter, + }, + 'WarnFilter': { + '()': WarnFilter, + }, + 'ErrorFilter': { + '()': ErrorFilter, + }, + 'CriticalFilter': { + '()': CriticalFilter, + }, + }, + 'handlers': { + 'milvus_celery_console': { + 'class': 'logging.StreamHandler', + 'formatter': 'colorful_console', + }, + 'milvus_debug_file': { + 'level': 'DEBUG', + 'filters': ['DebugFilter'], + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'default', + 'filename': build_log_file('debug', log_path, name, tz) + }, + 'milvus_info_file': { + 'level': 'INFO', + 'filters': ['InfoFilter'], + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'default', + 'filename': build_log_file('info', log_path, name, tz) + }, + 'milvus_warn_file': { + 'level': 'WARN', + 'filters': ['WarnFilter'], + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'default', + 'filename': build_log_file('warn', log_path, name, tz) + }, + 'milvus_error_file': { + 'level': 'ERROR', + 'filters': ['ErrorFilter'], + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'default', + 'filename': build_log_file('error', log_path, name, tz) + }, + 'milvus_critical_file': { + 'level': 'CRITICAL', + 'filters': ['CriticalFilter'], + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'default', + 'filename': build_log_file('critical', log_path, name, tz) + }, + }, + 'loggers': { + '': { + 'handlers': ['milvus_celery_console', 'milvus_info_file', 'milvus_debug_file', 'milvus_warn_file', \ + 'milvus_error_file', 'milvus_critical_file'], + 'level': log_level, + 'propagate': False + }, + }, + 'propagate': False, + } + + logging.config.dictConfig(LOGGING)