From 110e56c1b7f20574db351eea6a3c3d812ad21fc3 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Thu, 26 Sep 2019 18:34:02 +0800 Subject: [PATCH] add more child span for search --- mishards/server.py | 2 +- mishards/service_handler.py | 31 ++++++++++++++++++------------- tracing/__init__.py | 6 ++++++ 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/mishards/server.py b/mishards/server.py index 9dc09d6f05..876424089c 100644 --- a/mishards/server.py +++ b/mishards/server.py @@ -76,7 +76,7 @@ class Server: def start(self, port=None): handler_class = self.add_error_handlers(ServiceHandler) - add_MilvusServiceServicer_to_server(handler_class(conn_mgr=self.conn_mgr), self.server_impl) + add_MilvusServiceServicer_to_server(handler_class(conn_mgr=self.conn_mgr, tracer=self.tracer), self.server_impl) self.server_impl.add_insecure_port("[::]:{}".format(str(port or self._port))) self.server_impl.start() diff --git a/mishards/service_handler.py b/mishards/service_handler.py index cb904f4e42..72ae73932c 100644 --- a/mishards/service_handler.py +++ b/mishards/service_handler.py @@ -24,10 +24,11 @@ logger = logging.getLogger(__name__) class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): MAX_NPROBE = 2048 - def __init__(self, conn_mgr, *args, **kwargs): + def __init__(self, conn_mgr, tracer, *args, **kwargs): self.conn_mgr = conn_mgr self.table_meta = {} self.error_handlers = {} + self.tracer = tracer def connection(self, metadata=None): conn = self.conn_mgr.conn('WOSERVER', metadata=metadata) @@ -120,7 +121,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return status, topk_query_result - def _do_query(self, table_id, table_meta, vectors, topk, nprobe, range_array=None, **kwargs): + def _do_query(self, context, table_id, table_meta, vectors, topk, nprobe, range_array=None, **kwargs): metadata = kwargs.get('metadata', None) range_array = [self._range_to_date(r, metadata=metadata) for r in range_array] if range_array else None routing = self._get_routing_file_ids(table_id, range_array, metadata=metadata) @@ -140,16 +141,18 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): conn = self.query_conn(addr, metadata=metadata) start = time.time() - ret = conn.search_vectors_in_files(table_name=query_params['table_id'], - file_ids=query_params['file_ids'], - query_records=vectors, - top_k=topk, - nprobe=nprobe, - lazy=True) - end = time.time() - logger.info('search_vectors_in_files takes: {}'.format(end - start)) + with self.tracer.start_span('search_{}_span'.format(addr), + child_of=context.get_active_span().context): + ret = conn.search_vectors_in_files(table_name=query_params['table_id'], + file_ids=query_params['file_ids'], + query_records=vectors, + top_k=topk, + nprobe=nprobe, + lazy=True) + end = time.time() + logger.info('search_vectors_in_files takes: {}'.format(end - start)) - all_topk_results.append(ret) + all_topk_results.append(ret) with ThreadPoolExecutor(max_workers=workers) as pool: for addr, params in routing.items(): @@ -160,7 +163,9 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): res.result() reverse = table_meta.metric_type == types.MetricType.IP - return self._do_merge(all_topk_results, topk, reverse=reverse, metadata=metadata) + with self.tracer.start_span('do_merge', + child_of=context.get_active_span().context): + return self._do_merge(all_topk_results, topk, reverse=reverse, metadata=metadata) @mark_grpc_method def CreateTable(self, request, context): @@ -277,7 +282,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): query_range_array.append( Range(query_range.start_value, query_range.end_value)) - status, results = self._do_query(table_name, table_meta, query_record_array, topk, + status, results = self._do_query(context, table_name, table_meta, query_record_array, topk, nprobe, query_range_array, metadata=metadata) now = time.time() diff --git a/tracing/__init__.py b/tracing/__init__.py index 0aebf6ffba..27c57473db 100644 --- a/tracing/__init__.py +++ b/tracing/__init__.py @@ -14,3 +14,9 @@ class Tracer: def close(self): self.tracer and self.tracer.close() + + def start_span(self, operation_name=None, + child_of=None, references=None, tags=None, + start_time=None, ignore_active_span=False): + return self.tracer.start_span(operation_name, child_of, + references, tags, start_time, ignore_active_span)