mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
add tracing
This commit is contained in:
parent
bc056a2829
commit
a0a5965fc6
@ -8,12 +8,17 @@ from functools import wraps
|
||||
from concurrent import futures
|
||||
from grpc._cython import cygrpc
|
||||
from grpc._channel import _Rendezvous, _UnaryUnaryMultiCallable
|
||||
from jaeger_client import Config
|
||||
from grpc_opentracing import open_tracing_server_interceptor
|
||||
from grpc_opentracing.grpcext import intercept_server
|
||||
from milvus.grpc_gen.milvus_pb2_grpc import add_MilvusServiceServicer_to_server
|
||||
from mishards.service_handler import ServiceHandler
|
||||
from mishards import settings, discover
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def empty_server_interceptor_decorator(target_server, interceptor):
|
||||
return target_server
|
||||
|
||||
class Server:
|
||||
def __init__(self, conn_mgr, port=19530, max_workers=10, **kwargs):
|
||||
@ -23,12 +28,40 @@ class Server:
|
||||
self.exit_flag = False
|
||||
self.port = int(port)
|
||||
self.conn_mgr = conn_mgr
|
||||
tracer_interceptor = None
|
||||
self.tracer = None
|
||||
interceptor_decorator = empty_server_interceptor_decorator
|
||||
|
||||
if settings.TRACING_ENABLED:
|
||||
tracer_config = Config(config={
|
||||
'sampler': {
|
||||
'type': 'const',
|
||||
'param': 1,
|
||||
},
|
||||
'local_agent': {
|
||||
'reporting_host': settings.TracingConfig.TRACING_REPORTING_HOST,
|
||||
'reporting_port': settings.TracingConfig.TRACING_REPORTING_PORT
|
||||
},
|
||||
'logging': settings.TracingConfig.TRACING_LOGGING,
|
||||
},
|
||||
service_name=settings.TracingConfig.TRACING_SERVICE_NAME,
|
||||
validate=settings.TracingConfig.TRACING_VALIDATE
|
||||
)
|
||||
|
||||
self.tracer = tracer_config.initialize_tracer()
|
||||
tracer_interceptor = open_tracing_server_interceptor(self.tracer,
|
||||
log_payloads=settings.TracingConfig.TRACING_LOG_PAYLOAD)
|
||||
|
||||
interceptor_decorator = intercept_server
|
||||
|
||||
self.server_impl = grpc.server(
|
||||
thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
|
||||
options=[(cygrpc.ChannelArgKey.max_send_message_length, -1),
|
||||
(cygrpc.ChannelArgKey.max_receive_message_length, -1)]
|
||||
)
|
||||
|
||||
self.server_impl = interceptor_decorator(self.server_impl, tracer_interceptor)
|
||||
|
||||
self.register_pre_run_handler(self.pre_run_handler)
|
||||
|
||||
def pre_run_handler(self):
|
||||
@ -94,6 +127,7 @@ class Server:
|
||||
logger.info('Server is shuting down ......')
|
||||
self.exit_flag = True
|
||||
self.server_impl.stop(0)
|
||||
self.tracer and self.tracer.close()
|
||||
logger.info('Server is closed')
|
||||
|
||||
def add_error_handlers(self, target):
|
||||
|
||||
@ -46,6 +46,15 @@ elif SD_PROVIDER == 'Static':
|
||||
TESTING = env.bool('TESTING', False)
|
||||
TESTING_WOSERVER = env.str('TESTING_WOSERVER', 'tcp://127.0.0.1:19530')
|
||||
|
||||
TRACING_ENABLED = env.bool('TRACING_ENABLED', False)
|
||||
class TracingConfig:
|
||||
TRACING_LOGGING = env.bool('TRACING_LOGGING', True),
|
||||
TRACING_SERVICE_NAME = env.str('TRACING_SERVICE_NAME', 'mishards')
|
||||
TRACING_VALIDATE = env.bool('TRACING_VALIDATE', True)
|
||||
TRACING_LOG_PAYLOAD = env.bool('TRACING_LOG_PAYLOAD', DEBUG)
|
||||
TRACING_REPORTING_HOST = env.str('TRACING_REPORTING_HOST', '127.0.0.1')
|
||||
TRACING_REPORTING_PORT = env.str('TRACING_REPORTING_PORT', '5775')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging
|
||||
|
||||
@ -31,3 +31,5 @@ rsa==4.0
|
||||
six==1.12.0
|
||||
SQLAlchemy==1.3.5
|
||||
urllib3==1.25.3
|
||||
jaeger-client>=3.4.0
|
||||
grpcio-opentracing>=1.0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user