mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
refactor tracing
This commit is contained in:
parent
a0a5965fc6
commit
d4fb05688a
@ -12,7 +12,10 @@ from sd import ProviderManager
|
||||
sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER)
|
||||
discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr)
|
||||
|
||||
from tracing.factory import TracerFactory
|
||||
tracer = TracerFactory.new_tracer(settings.TRACING_TYPE, settings.TracingConfig)
|
||||
|
||||
from mishards.server import Server
|
||||
grpc_server = Server(conn_mgr=connect_mgr)
|
||||
grpc_server = Server(conn_mgr=connect_mgr, tracer=tracer)
|
||||
|
||||
from mishards import exception_handlers
|
||||
|
||||
@ -9,19 +9,15 @@ from concurrent import futures
|
||||
from grpc._cython import cygrpc
|
||||
from grpc._channel import _Rendezvous, _UnaryUnaryMultiCallable
|
||||
from jaeger_client import Config
|
||||
from grpc_opentracing import open_tracing_server_interceptor
|
||||
from grpc_opentracing.grpcext import intercept_server
|
||||
from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server
|
||||
from mishards.service_handler import ServiceHandler
|
||||
from mishards import settings, discover
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def empty_server_interceptor_decorator(target_server, interceptor):
|
||||
return target_server
|
||||
|
||||
class Server:
|
||||
def __init__(self, conn_mgr, port=19530, max_workers=10, **kwargs):
|
||||
def __init__(self, conn_mgr, tracer, port=19530, max_workers=10, **kwargs):
|
||||
self.pre_run_handlers = set()
|
||||
self.grpc_methods = set()
|
||||
self.error_handlers = {}
|
||||
@ -29,30 +25,7 @@ class Server:
|
||||
self.port = int(port)
|
||||
self.conn_mgr = conn_mgr
|
||||
tracer_interceptor = None
|
||||
self.tracer = None
|
||||
interceptor_decorator = empty_server_interceptor_decorator
|
||||
|
||||
if settings.TRACING_ENABLED:
|
||||
tracer_config = Config(config={
|
||||
'sampler': {
|
||||
'type': 'const',
|
||||
'param': 1,
|
||||
},
|
||||
'local_agent': {
|
||||
'reporting_host': settings.TracingConfig.TRACING_REPORTING_HOST,
|
||||
'reporting_port': settings.TracingConfig.TRACING_REPORTING_PORT
|
||||
},
|
||||
'logging': settings.TracingConfig.TRACING_LOGGING,
|
||||
},
|
||||
service_name=settings.TracingConfig.TRACING_SERVICE_NAME,
|
||||
validate=settings.TracingConfig.TRACING_VALIDATE
|
||||
)
|
||||
|
||||
self.tracer = tracer_config.initialize_tracer()
|
||||
tracer_interceptor = open_tracing_server_interceptor(self.tracer,
|
||||
log_payloads=settings.TracingConfig.TRACING_LOG_PAYLOAD)
|
||||
|
||||
interceptor_decorator = intercept_server
|
||||
self.tracer = tracer
|
||||
|
||||
self.server_impl = grpc.server(
|
||||
thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
|
||||
@ -60,7 +33,7 @@ class Server:
|
||||
(cygrpc.ChannelArgKey.max_receive_message_length, -1)]
|
||||
)
|
||||
|
||||
self.server_impl = interceptor_decorator(self.server_impl, tracer_interceptor)
|
||||
self.server_impl = self.tracer.decorate(self.server_impl)
|
||||
|
||||
self.register_pre_run_handler(self.pre_run_handler)
|
||||
|
||||
@ -127,7 +100,7 @@ class Server:
|
||||
logger.info('Server is shuting down ......')
|
||||
self.exit_flag = True
|
||||
self.server_impl.stop(0)
|
||||
self.tracer and self.tracer.close()
|
||||
self.tracer.close()
|
||||
logger.info('Server is closed')
|
||||
|
||||
def add_error_handlers(self, target):
|
||||
|
||||
@ -46,7 +46,7 @@ elif SD_PROVIDER == 'Static':
|
||||
TESTING = env.bool('TESTING', False)
|
||||
TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530')
|
||||
|
||||
TRACING_ENABLED = env.bool('TRACING_ENABLED', False)
|
||||
TRACING_TYPE = env.str('TRACING_TYPE', '')
|
||||
class TracingConfig:
|
||||
TRACING_LOGGING = env.bool('TRACING_LOGGING', True),
|
||||
TRACING_SERVICE_NAME = env.str('TRACING_SERVICE_NAME', 'mishards')
|
||||
|
||||
17
tracing/__init__.py
Normal file
17
tracing/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
|
||||
def empty_server_interceptor_decorator(target_server, interceptor):
|
||||
return target_server
|
||||
|
||||
class Tracer:
|
||||
def __init__(self, tracer=None,
|
||||
interceptor=None,
|
||||
server_decorator=empty_server_interceptor_decorator):
|
||||
self.tracer = tracer
|
||||
self.interceptor = interceptor
|
||||
self.server_decorator=server_decorator
|
||||
|
||||
def decorate(self, server):
|
||||
return self.server_decorator(server, self.interceptor)
|
||||
|
||||
def close(self):
|
||||
self.tracer and self.tracer.close()
|
||||
39
tracing/factory.py
Normal file
39
tracing/factory.py
Normal file
@ -0,0 +1,39 @@
|
||||
import logging
|
||||
from jaeger_client import Config
|
||||
from grpc_opentracing.grpcext import intercept_server
|
||||
from grpc_opentracing import open_tracing_server_interceptor
|
||||
|
||||
from tracing import Tracer, empty_server_interceptor_decorator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TracerFactory:
|
||||
@classmethod
|
||||
def new_tracer(cls, tracer_type, tracer_config, **kwargs):
|
||||
if not tracer_type:
|
||||
return Tracer()
|
||||
|
||||
if tracer_type.lower() == 'jaeger':
|
||||
config = Config(config={
|
||||
'sampler': {
|
||||
'type': 'const',
|
||||
'param': 1,
|
||||
},
|
||||
'local_agent': {
|
||||
'reporting_host': tracer_config.TRACING_REPORTING_HOST,
|
||||
'reporting_port': tracer_config.TRACING_REPORTING_PORT
|
||||
},
|
||||
'logging': tracer_config.TRACING_LOGGING,
|
||||
},
|
||||
service_name=tracer_config.TRACING_SERVICE_NAME,
|
||||
validate=tracer_config.TRACING_VALIDATE
|
||||
)
|
||||
|
||||
tracer = config.initialize_tracer()
|
||||
tracer_interceptor = open_tracing_server_interceptor(tracer,
|
||||
log_payloads=tracer_config.TRACING_LOG_PAYLOAD)
|
||||
|
||||
return Tracer(tracer, tracer_interceptor, intercept_server)
|
||||
|
||||
assert False, 'Unsupported tracer type: {}'.format(tracer_type)
|
||||
Loading…
x
Reference in New Issue
Block a user