add more child span for search

This commit is contained in:
peng.xu 2019-09-26 18:34:02 +08:00
parent 11ba6beb40
commit 110e56c1b7
3 changed files with 25 additions and 14 deletions

View File

@ -76,7 +76,7 @@ class Server:
def start(self, port=None):
handler_class = self.add_error_handlers(ServiceHandler)
add_MilvusServiceServicer_to_server(handler_class(conn_mgr=self.conn_mgr), self.server_impl)
add_MilvusServiceServicer_to_server(handler_class(conn_mgr=self.conn_mgr, tracer=self.tracer), self.server_impl)
self.server_impl.add_insecure_port("[::]:{}".format(str(port or self._port)))
self.server_impl.start()

View File

@ -24,10 +24,11 @@ logger = logging.getLogger(__name__)
class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
MAX_NPROBE = 2048
def __init__(self, conn_mgr, *args, **kwargs):
def __init__(self, conn_mgr, tracer, *args, **kwargs):
self.conn_mgr = conn_mgr
self.table_meta = {}
self.error_handlers = {}
self.tracer = tracer
def connection(self, metadata=None):
conn = self.conn_mgr.conn('WOSERVER', metadata=metadata)
@ -120,7 +121,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
return status, topk_query_result
def _do_query(self, table_id, table_meta, vectors, topk, nprobe, range_array=None, **kwargs):
def _do_query(self, context, table_id, table_meta, vectors, topk, nprobe, range_array=None, **kwargs):
metadata = kwargs.get('metadata', None)
range_array = [self._range_to_date(r, metadata=metadata) for r in range_array] if range_array else None
routing = self._get_routing_file_ids(table_id, range_array, metadata=metadata)
@ -140,16 +141,18 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
conn = self.query_conn(addr, metadata=metadata)
start = time.time()
ret = conn.search_vectors_in_files(table_name=query_params['table_id'],
file_ids=query_params['file_ids'],
query_records=vectors,
top_k=topk,
nprobe=nprobe,
lazy=True)
end = time.time()
logger.info('search_vectors_in_files takes: {}'.format(end - start))
with self.tracer.start_span('search_{}_span'.format(addr),
child_of=context.get_active_span().context):
ret = conn.search_vectors_in_files(table_name=query_params['table_id'],
file_ids=query_params['file_ids'],
query_records=vectors,
top_k=topk,
nprobe=nprobe,
lazy=True)
end = time.time()
logger.info('search_vectors_in_files takes: {}'.format(end - start))
all_topk_results.append(ret)
all_topk_results.append(ret)
with ThreadPoolExecutor(max_workers=workers) as pool:
for addr, params in routing.items():
@ -160,7 +163,9 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
res.result()
reverse = table_meta.metric_type == types.MetricType.IP
return self._do_merge(all_topk_results, topk, reverse=reverse, metadata=metadata)
with self.tracer.start_span('do_merge',
child_of=context.get_active_span().context):
return self._do_merge(all_topk_results, topk, reverse=reverse, metadata=metadata)
@mark_grpc_method
def CreateTable(self, request, context):
@ -277,7 +282,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer):
query_range_array.append(
Range(query_range.start_value, query_range.end_value))
status, results = self._do_query(table_name, table_meta, query_record_array, topk,
status, results = self._do_query(context, table_name, table_meta, query_record_array, topk,
nprobe, query_range_array, metadata=metadata)
now = time.time()

View File

@ -14,3 +14,9 @@ class Tracer:
def close(self):
self.tracer and self.tracer.close()
def start_span(self, operation_name=None,
child_of=None, references=None, tags=None,
start_time=None, ignore_active_span=False):
return self.tracer.start_span(operation_name, child_of,
references, tags, start_time, ignore_active_span)