From d4fb05688aa819f0761ed1017717a74e52a78873 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Wed, 25 Sep 2019 17:14:18 +0800 Subject: [PATCH] refactor tracing --- mishards/__init__.py | 5 ++++- mishards/server.py | 35 ++++------------------------------- mishards/settings.py | 2 +- tracing/__init__.py | 17 +++++++++++++++++ tracing/factory.py | 39 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 33 deletions(-) create mode 100644 tracing/__init__.py create mode 100644 tracing/factory.py diff --git a/mishards/__init__.py b/mishards/__init__.py index 55b24c082c..640293c265 100644 --- a/mishards/__init__.py +++ b/mishards/__init__.py @@ -12,7 +12,10 @@ from sd import ProviderManager sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER) discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr) +from tracing.factory import TracerFactory +tracer = TracerFactory.new_tracer(settings.TRACING_TYPE, settings.TracingConfig) + from mishards.server import Server -grpc_server = Server(conn_mgr=connect_mgr) +grpc_server = Server(conn_mgr=connect_mgr, tracer=tracer) from mishards import exception_handlers diff --git a/mishards/server.py b/mishards/server.py index 4e44731f0e..93d7e38826 100644 --- a/mishards/server.py +++ b/mishards/server.py @@ -9,19 +9,15 @@ from concurrent import futures from grpc._cython import cygrpc from grpc._channel import _Rendezvous, _UnaryUnaryMultiCallable from jaeger_client import Config -from grpc_opentracing import open_tracing_server_interceptor -from grpc_opentracing.grpcext import intercept_server from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server from mishards.service_handler import ServiceHandler from mishards import settings, discover logger = logging.getLogger(__name__) -def empty_server_interceptor_decorator(target_server, interceptor): - return target_server class Server: - def __init__(self, conn_mgr, port=19530, max_workers=10, **kwargs): + def __init__(self, conn_mgr, tracer, port=19530, max_workers=10, **kwargs): self.pre_run_handlers = set() self.grpc_methods = set() self.error_handlers = {} @@ -29,30 +25,7 @@ class Server: self.port = int(port) self.conn_mgr = conn_mgr tracer_interceptor = None - self.tracer = None - interceptor_decorator = empty_server_interceptor_decorator - - if settings.TRACING_ENABLED: - tracer_config = Config(config={ - 'sampler': { - 'type': 'const', - 'param': 1, - }, - 'local_agent': { - 'reporting_host': settings.TracingConfig.TRACING_REPORTING_HOST, - 'reporting_port': settings.TracingConfig.TRACING_REPORTING_PORT - }, - 'logging': settings.TracingConfig.TRACING_LOGGING, - }, - service_name=settings.TracingConfig.TRACING_SERVICE_NAME, - validate=settings.TracingConfig.TRACING_VALIDATE - ) - - self.tracer = tracer_config.initialize_tracer() - tracer_interceptor = open_tracing_server_interceptor(self.tracer, - log_payloads=settings.TracingConfig.TRACING_LOG_PAYLOAD) - - interceptor_decorator = intercept_server + self.tracer = tracer self.server_impl = grpc.server( thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers), @@ -60,7 +33,7 @@ class Server: (cygrpc.ChannelArgKey.max_receive_message_length, -1)] ) - self.server_impl = interceptor_decorator(self.server_impl, tracer_interceptor) + self.server_impl = self.tracer.decorate(self.server_impl) self.register_pre_run_handler(self.pre_run_handler) @@ -127,7 +100,7 @@ class Server: logger.info('Server is shuting down ......') self.exit_flag = True self.server_impl.stop(0) - self.tracer and self.tracer.close() + self.tracer.close() logger.info('Server is closed') def add_error_handlers(self, target): diff --git a/mishards/settings.py b/mishards/settings.py index 94b8998881..9a8e770f11 100644 --- a/mishards/settings.py +++ b/mishards/settings.py @@ -46,7 +46,7 @@ elif SD_PROVIDER == 'Static': TESTING = env.bool('TESTING', False) TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530') -TRACING_ENABLED = env.bool('TRACING_ENABLED', False) +TRACING_TYPE = env.str('TRACING_TYPE', '') class TracingConfig: TRACING_LOGGING = env.bool('TRACING_LOGGING', True), TRACING_SERVICE_NAME = env.str('TRACING_SERVICE_NAME', 'mishards') diff --git a/tracing/__init__.py b/tracing/__init__.py new file mode 100644 index 0000000000..3edddea9df --- /dev/null +++ b/tracing/__init__.py @@ -0,0 +1,17 @@ + +def empty_server_interceptor_decorator(target_server, interceptor): + return target_server + +class Tracer: + def __init__(self, tracer=None, + interceptor=None, + server_decorator=empty_server_interceptor_decorator): + self.tracer = tracer + self.interceptor = interceptor + self.server_decorator=server_decorator + + def decorate(self, server): + return self.server_decorator(server, self.interceptor) + + def close(self): + self.tracer and self.tracer.close() diff --git a/tracing/factory.py b/tracing/factory.py new file mode 100644 index 0000000000..f00a537e78 --- /dev/null +++ b/tracing/factory.py @@ -0,0 +1,39 @@ +import logging +from jaeger_client import Config +from grpc_opentracing.grpcext import intercept_server +from grpc_opentracing import open_tracing_server_interceptor + +from tracing import Tracer, empty_server_interceptor_decorator + +logger = logging.getLogger(__name__) + + +class TracerFactory: + @classmethod + def new_tracer(cls, tracer_type, tracer_config, **kwargs): + if not tracer_type: + return Tracer() + + if tracer_type.lower() == 'jaeger': + config = Config(config={ + 'sampler': { + 'type': 'const', + 'param': 1, + }, + 'local_agent': { + 'reporting_host': tracer_config.TRACING_REPORTING_HOST, + 'reporting_port': tracer_config.TRACING_REPORTING_PORT + }, + 'logging': tracer_config.TRACING_LOGGING, + }, + service_name=tracer_config.TRACING_SERVICE_NAME, + validate=tracer_config.TRACING_VALIDATE + ) + + tracer = config.initialize_tracer() + tracer_interceptor = open_tracing_server_interceptor(tracer, + log_payloads=tracer_config.TRACING_LOG_PAYLOAD) + + return Tracer(tracer, tracer_interceptor, intercept_server) + + assert False, 'Unsupported tracer type: {}'.format(tracer_type)