From a0a5965fc6c826accf02a64c743d45e636f5b687 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Wed, 25 Sep 2019 16:23:02 +0800 Subject: [PATCH] add tracing --- mishards/server.py | 34 ++++++++++++++++++++++++++++++++++ mishards/settings.py | 9 +++++++++ requirements.txt | 2 ++ 3 files changed, 45 insertions(+) diff --git a/mishards/server.py b/mishards/server.py index 9cca096b6b..4e44731f0e 100644 --- a/mishards/server.py +++ b/mishards/server.py @@ -8,12 +8,17 @@ from functools import wraps from concurrent import futures from grpc._cython import cygrpc from grpc._channel import _Rendezvous, _UnaryUnaryMultiCallable +from jaeger_client import Config +from grpc_opentracing import open_tracing_server_interceptor +from grpc_opentracing.grpcext import intercept_server from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server from mishards.service_handler import ServiceHandler from mishards import settings, discover logger = logging.getLogger(__name__) +def empty_server_interceptor_decorator(target_server, interceptor): + return target_server class Server: def __init__(self, conn_mgr, port=19530, max_workers=10, **kwargs): @@ -23,12 +28,40 @@ class Server: self.exit_flag = False self.port = int(port) self.conn_mgr = conn_mgr + tracer_interceptor = None + self.tracer = None + interceptor_decorator = empty_server_interceptor_decorator + + if settings.TRACING_ENABLED: + tracer_config = Config(config={ + 'sampler': { + 'type': 'const', + 'param': 1, + }, + 'local_agent': { + 'reporting_host': settings.TracingConfig.TRACING_REPORTING_HOST, + 'reporting_port': settings.TracingConfig.TRACING_REPORTING_PORT + }, + 'logging': settings.TracingConfig.TRACING_LOGGING, + }, + service_name=settings.TracingConfig.TRACING_SERVICE_NAME, + validate=settings.TracingConfig.TRACING_VALIDATE + ) + + self.tracer = tracer_config.initialize_tracer() + tracer_interceptor = open_tracing_server_interceptor(self.tracer, + log_payloads=settings.TracingConfig.TRACING_LOG_PAYLOAD) + + interceptor_decorator = intercept_server + self.server_impl = grpc.server( thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers), options=[(cygrpc.ChannelArgKey.max_send_message_length, -1), (cygrpc.ChannelArgKey.max_receive_message_length, -1)] ) + self.server_impl = interceptor_decorator(self.server_impl, tracer_interceptor) + self.register_pre_run_handler(self.pre_run_handler) def pre_run_handler(self): @@ -94,6 +127,7 @@ class Server: logger.info('Server is shuting down ......') self.exit_flag = True self.server_impl.stop(0) + self.tracer and self.tracer.close() logger.info('Server is closed') def add_error_handlers(self, target): diff --git a/mishards/settings.py b/mishards/settings.py index 46221c5f98..94b8998881 100644 --- a/mishards/settings.py +++ b/mishards/settings.py @@ -46,6 +46,15 @@ elif SD_PROVIDER == 'Static': TESTING = env.bool('TESTING', False) TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530') +TRACING_ENABLED = env.bool('TRACING_ENABLED', False) +class TracingConfig: + TRACING_LOGGING = env.bool('TRACING_LOGGING', True), + TRACING_SERVICE_NAME = env.str('TRACING_SERVICE_NAME', 'mishards') + TRACING_VALIDATE = env.bool('TRACING_VALIDATE', True) + TRACING_LOG_PAYLOAD = env.bool('TRACING_LOG_PAYLOAD', DEBUG) + TRACING_REPORTING_HOST = env.str('TRACING_REPORTING_HOST', '127.0.0.1') + TRACING_REPORTING_PORT = env.str('TRACING_REPORTING_PORT', '5775') + if __name__ == '__main__': import logging diff --git a/requirements.txt b/requirements.txt index 8cedabdf7b..03db7aeed3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,3 +31,5 @@ rsa==4.0 six==1.12.0 SQLAlchemy==1.3.5 urllib3==1.25.3 +jaeger-client>=3.4.0 +grpcio-opentracing>=1.0