From fb5e6ab3b809754fd425770fd5cf48a704135ad0 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Fri, 18 Oct 2019 13:46:09 +0800 Subject: [PATCH] refactor max workers in handler --- mishards/service_handler.py | 8 ++++---- mishards/settings.py | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/mishards/service_handler.py b/mishards/service_handler.py index e26f2bfd74..669d96802a 100644 --- a/mishards/service_handler.py +++ b/mishards/service_handler.py @@ -3,6 +3,7 @@ import time import datetime from collections import defaultdict +import multiprocessing from concurrent.futures import ThreadPoolExecutor from milvus.grpc_gen import milvus_pb2, milvus_pb2_grpc, status_pb2 from milvus.grpc_gen.milvus_pb2 import TopKQueryResult @@ -20,12 +21,13 @@ logger = logging.getLogger(__name__) class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): MAX_NPROBE = 2048 - def __init__(self, conn_mgr, tracer, router, *args, **kwargs): + def __init__(self, conn_mgr, tracer, router, max_workers=multiprocessing.cpu_count(), **kwargs): self.conn_mgr = conn_mgr self.table_meta = {} self.error_handlers = {} self.tracer = tracer self.router = router + self.max_workers = max_workers def connection(self, metadata=None): conn = self.conn_mgr.conn('WOSERVER', metadata=metadata) @@ -102,8 +104,6 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): rs = [] all_topk_results = [] - workers = settings.SEARCH_WORKER_SIZE - def search(addr, query_params, vectors, topk, nprobe, **kwargs): logger.info( 'Send Search Request: addr={};params={};nq={};topk={};nprobe={}' @@ -130,7 +130,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): all_topk_results.append(ret) with self.tracer.start_span('do_search', child_of=p_span) as span: - with ThreadPoolExecutor(max_workers=workers) as pool: + with ThreadPoolExecutor(max_workers=self.max_workers) as pool: for addr, params in routing.items(): res = pool.submit(search, addr, diff --git a/mishards/settings.py b/mishards/settings.py index 5e81a1a8ad..fd07d9d436 100644 --- a/mishards/settings.py +++ b/mishards/settings.py @@ -23,7 +23,6 @@ config(LOG_LEVEL, LOG_PATH, LOG_NAME, TIMEZONE) TIMEOUT = env.int('TIMEOUT', 60) MAX_RETRY = env.int('MAX_RETRY', 3) -SEARCH_WORKER_SIZE = env.int('SEARCH_WORKER_SIZE', 10) SERVER_PORT = env.int('SERVER_PORT', 19530) WOSERVER = env.str('WOSERVER')