From 68db0774b6a535d4572a7562e50cb2f526b6c4fe Mon Sep 17 00:00:00 2001 From: del-zhenwu <56623710+del-zhenwu@users.noreply.github.com> Date: Fri, 23 Jul 2021 15:36:12 +0800 Subject: [PATCH] [skip ci] Update benchmark readme (#6765) Signed-off-by: zhenwu --- tests/milvus_benchmark/Dockerfile | 30 ++++++ tests/milvus_benchmark/README.md | 63 ++++++------- .../milvus_benchmark/client.py | 26 ++++-- .../milvus_benchmark/config.py | 14 ++- .../milvus_benchmark/logs/logging.yaml | 2 +- .../milvus_benchmark/milvus_benchmark/main.py | 43 +++++---- .../milvus_benchmark/metrics/models/metric.py | 4 + .../milvus_benchmark/runners/accuracy.py | 4 +- .../milvus_benchmark/runners/base.py | 6 +- .../milvus_benchmark/runners/build.py | 1 + .../milvus_benchmark/runners/chaos.py | 1 + .../milvus_benchmark/runners/get.py | 1 + .../milvus_benchmark/runners/insert.py | 1 + .../milvus_benchmark/runners/locust.py | 78 ++++++++-------- .../milvus_benchmark/runners/locust_tasks.py | 34 ++++--- .../milvus_benchmark/runners/locust_user.py | 62 +++++++++++-- .../milvus_benchmark/runners/search.py | 14 ++- .../suites/2_insert_data.yaml | 13 +-- .../suites/2_locust_insert.yaml | 18 +--- .../suites/2_locust_insert_flush.yaml | 25 +++++ .../suites/2_locust_load_insert.yaml | 25 +++++ .../suites/2_locust_load_insert_flush.yaml | 25 +++++ .../suites/2_locust_random.yaml | 13 +++ .../milvus_benchmark/update.py | 79 ++++++++-------- .../milvus_benchmark/utils.py | 92 +++++-------------- tests/milvus_benchmark/requirements.txt | 2 +- 26 files changed, 412 insertions(+), 264 deletions(-) create mode 100644 tests/milvus_benchmark/Dockerfile create mode 100644 tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert_flush.yaml create mode 100644 tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert.yaml create mode 100644 tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert_flush.yaml diff --git a/tests/milvus_benchmark/Dockerfile b/tests/milvus_benchmark/Dockerfile new file mode 100644 index 0000000000..55f03c1134 --- /dev/null +++ b/tests/milvus_benchmark/Dockerfile @@ -0,0 +1,30 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. + +FROM python:3.6.8-jessie + +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +RUN apt-get update && apt-get install -y --no-install-recommends wget apt-transport-https && \ + wget -qO- "https://get.helm.sh/helm-v3.0.2-linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/bin linux-amd64/helm && \ + wget -P /tmp https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg && \ + apt-key add /tmp/apt-key.gpg && \ + sh -c 'echo deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main > /etc/apt/sources.list.d/kubernetes.list' && \ + apt-get update && apt-get install -y --no-install-recommends \ + build-essential kubectl && \ + apt-get remove --purge -y && \ + rm -rf /var/lib/apt/lists/* + +COPY requirements.txt /requirements.txt + +RUN python3 -m pip install -r /requirements.txt + +WORKDIR /root \ No newline at end of file diff --git a/tests/milvus_benchmark/README.md b/tests/milvus_benchmark/README.md index c408c8d704..b9cddafbd2 100644 --- a/tests/milvus_benchmark/README.md +++ b/tests/milvus_benchmark/README.md @@ -1,50 +1,45 @@ -# Quick start +`milvus_benchmark` is a non-functional testing tool which allows users to run tests on k8s cluster or at local, the primary use case is performance/load/stability testing, the objective is to expose problems in milvus project. + +## Quick start ### Description: -- Test suites can be organized with `yaml ` -- Test can run with local mode or argo/jenkins mode, that manage the server env in argo/jenkins step or stages +- Test cases in `milvus_benchmark` can be organized with `yaml` +- Test can run with local mode or helm mode + - local: install and start your local server, and pass the host/port param when start the tests + - helm: install the server by helm, which will manage the milvus in k8s cluster, and you can interagte the test stage into argo workflow or jenkins pipeline -### Demos: +### Usage: -1. Using argo pipeline: - Run test suites(1.x, 2.x version) in argo workflows, innernal argo url: argo-test.zilliz.cc +1. Using jenkins: + Use `ci/main_jenkinsfile` as the jenkins pipeline file +2. Using argo: + example argo workflow yaml configuration: `ci/argo.yaml` +3. Local test: -2. Local test: - Run test with the local server - 1. set python path + 1). set PYTHONPATH: - `export PYTHONPATH=/yourmilvusprojectpath/tests/milvus_benchmark` - 2. (optional, for `sift`/`glove` open dataset) mount NAS or update `*_DATA_DIR` in `runner.py` + `export PYTHONPATH=/your/project/path/milvus_benchmark` + 2). prepare data: + + if we need to use the sift/deep dataset as the raw data input, we need to mount NAS and update `RAW_DATA_DIR` in `config.py`, the example mount command: `sudo mount -t cifs -o username=test,vers=1.0 //172.16.70.249/test /test` - 3. run test - `cd milvus-benchmark/` + 3). install requirements: - `python3 main.py --local --host=*.* --port=19530 --suite=suites/2_insert_data.yaml` + `pip install -r requirements.txt` + + 4). write test yaml and run with the yaml param: + + `cd milvus-benchmark/ && python main.py --local --host=* --port=19530 --suite=suites/2_insert_data.yaml` -### Definitions of test suites: +### Definitions of test suite: -Testers need to write test suite config if adding a customized test into the current test framework +Test suite yaml defines the test process, users need to write test suite yaml if adding a customized test into the current test framework. -The following are the searching performance test suite: +Take the test file `2_insert_data.yaml` as an example, the top level is the test type: `insert_performance`, there are lots of test types including: `search_performance/build_performance/insert_performance/accuracy/locust_insert/...`, each test type corresponds to this different runner defined in directory `runnners`, the other parts in the test yaml is the params pass to the runner, such as the field `collection_name` means which kind of collection will be created in milvus. -1. insert_search_performance: the test type,also we have: +### Test result: - `search_performance`,`build_performance`,`insert_performance`,`accuracy`,`stability`,`search_stability` -2. collections: list of test cases -3. The following fields are in the `collection` field: - - milvus: milvus config - - collection_name: currently support one table - - ni_per: per count of insert - - index_type: index type - - index_param: param of index - - run_count: search count - - search_params: params of search_vectors - - top_ks: top_k of search - - nqs: nq of search - -## Test result: - -Test result will be uploaded, which will be used to tell if the test run pass or failed +Test result will be uploaded if run with the helm mode, which will be used to judge if the test run pass or failed. diff --git a/tests/milvus_benchmark/milvus_benchmark/client.py b/tests/milvus_benchmark/milvus_benchmark/client.py index 262590b9e4..2a554f7217 100644 --- a/tests/milvus_benchmark/milvus_benchmark/client.py +++ b/tests/milvus_benchmark/milvus_benchmark/client.py @@ -153,6 +153,16 @@ class MilvusClient(object): except Exception as e: logger.error(str(e)) + @time_wrapper + def insert_flush(self, entities, _async=False, collection_name=None): + tmp_collection_name = self._collection_name if collection_name is None else collection_name + try: + insert_res = self._milvus.insert(tmp_collection_name, entities) + return insert_res.primary_keys + except Exception as e: + logger.error(str(e)) + self._milvus.flush([tmp_collection_name], _async=_async) + def get_dimension(self): info = self.get_info() for field in info["fields"]: @@ -424,28 +434,28 @@ class MilvusClient(object): self.drop(collection_name=name) @time_wrapper - def load_collection(self, collection_name=None): + def load_collection(self, collection_name=None, timeout=3000): if collection_name is None: collection_name = self._collection_name - return self._milvus.load_collection(collection_name, timeout=3000) + return self._milvus.load_collection(collection_name, timeout=timeout) @time_wrapper - def release_collection(self, collection_name=None): + def release_collection(self, collection_name=None, timeout=3000): if collection_name is None: collection_name = self._collection_name - return self._milvus.release_collection(collection_name, timeout=3000) + return self._milvus.release_collection(collection_name, timeout=timeout) @time_wrapper - def load_partitions(self, tag_names, collection_name=None): + def load_partitions(self, tag_names, collection_name=None, timeout=3000): if collection_name is None: collection_name = self._collection_name - return self._milvus.load_partitions(collection_name, tag_names, timeout=3000) + return self._milvus.load_partitions(collection_name, tag_names, timeout=timeout) @time_wrapper - def release_partitions(self, tag_names, collection_name=None): + def release_partitions(self, tag_names, collection_name=None, timeout=3000): if collection_name is None: collection_name = self._collection_name - return self._milvus.release_partitions(collection_name, tag_names, timeout=3000) + return self._milvus.release_partitions(collection_name, tag_names, timeout=timeout) # TODO: remove # def get_server_version(self): diff --git a/tests/milvus_benchmark/milvus_benchmark/config.py b/tests/milvus_benchmark/milvus_benchmark/config.py index 50975a39b0..21586cd1eb 100644 --- a/tests/milvus_benchmark/milvus_benchmark/config.py +++ b/tests/milvus_benchmark/milvus_benchmark/config.py @@ -6,10 +6,13 @@ JOB_COLLECTION = "jobs" REGISTRY_URL = "registry.zilliz.com/milvus/milvus" IDC_NAS_URL = "//172.16.70.249/test" +DEFAULT_IMAGE = "milvusdb/milvus:latest" SERVER_HOST_DEFAULT = "127.0.0.1" SERVER_PORT_DEFAULT = 19530 -SERVER_VERSION = "2.0" +SERVER_VERSION = "2.0.0-RC3" +DEFUALT_DEPLOY_MODE = "single" + HELM_NAMESPACE = "milvus" BRANCH = "master" @@ -22,9 +25,18 @@ RAW_DATA_DIR = "/test/milvus/raw_data/" LOG_PATH = "/test/milvus/benchmark/logs/{}/".format(BRANCH) DEFAULT_DEPLOY_MODE = "single" +SINGLE_DEPLOY_MODE = "single" +CLUSTER_DEPLOY_MODE = "cluster" NAMESPACE = "milvus" CHAOS_NAMESPACE = "chaos-testing" DEFAULT_API_VERSION = 'chaos-mesh.org/v1alpha1' DEFAULT_GROUP = 'chaos-mesh.org' DEFAULT_VERSION = 'v1alpha1' + +# minio config +MINIO_HOST = "milvus-test-minio.qa-milvus.svc.cluster.local" +MINIO_PORT = 9000 +MINIO_ACCESS_KEY = "minioadmin" +MINIO_SECRET_KEY = "minioadmin" +MINIO_BUCKET_NAME = "test" \ No newline at end of file diff --git a/tests/milvus_benchmark/milvus_benchmark/logs/logging.yaml b/tests/milvus_benchmark/milvus_benchmark/logs/logging.yaml index 8b30fbc80f..908133e590 100644 --- a/tests/milvus_benchmark/milvus_benchmark/logs/logging.yaml +++ b/tests/milvus_benchmark/milvus_benchmark/logs/logging.yaml @@ -2,7 +2,7 @@ version: 1 disable_existing_loggers: False formatters: simple: - format: "%(asctime)s - %(name)s:%(lineno)s - %(levelname)s - %(message)s" + format: "[%(asctime)-15s] [%(levelname)8s] - %(message)s (%(name)s:%(lineno)s)" handlers: console: diff --git a/tests/milvus_benchmark/milvus_benchmark/main.py b/tests/milvus_benchmark/milvus_benchmark/main.py index 0b62a2dce6..9219ed7f4d 100644 --- a/tests/milvus_benchmark/milvus_benchmark/main.py +++ b/tests/milvus_benchmark/milvus_benchmark/main.py @@ -1,14 +1,9 @@ import os import sys -import time -from datetime import datetime -import pdb import argparse import logging import traceback -from multiprocessing import Process -from queue import Queue -from logging import handlers +# from queue import Queue from yaml import full_load, dump from milvus_benchmark.metrics.models.server import Server from milvus_benchmark.metrics.models.hardware import Hardware @@ -17,7 +12,7 @@ from milvus_benchmark.metrics.models.env import Env from milvus_benchmark.env import get_env from milvus_benchmark.runners import get_runner from milvus_benchmark.metrics import api -from milvus_benchmark import config +from milvus_benchmark import config, utils from milvus_benchmark import parser # from scheduler import back_scheduler from logs import log @@ -25,11 +20,7 @@ from logs import log log.setup_logging() logger = logging.getLogger("milvus_benchmark.main") -DEFAULT_IMAGE = "milvusdb/milvus:latest" -LOG_FOLDER = "logs" -NAMESPACE = "milvus" -SERVER_VERSION = "2.0" -q = Queue() +# q = Queue() def positive_int(s): @@ -58,7 +49,7 @@ def run_suite(run_type, suite, env_mode, env_params): try: start_status = False metric = api.Metric() - deploy_mode = env_params["deploy_mode"] if "deploy_mode" in env_params else config.DEFAULT_DEPLOY_MODE + deploy_mode = env_params["deploy_mode"] env = get_env(env_mode, deploy_mode) metric.set_run_id() metric.set_mode(env_mode) @@ -67,6 +58,8 @@ def run_suite(run_type, suite, env_mode, env_params): logger.info(env_params) if env_mode == "local": metric.hardware = Hardware("") + if "server_tag" in env_params and env_params["server_tag"]: + metric.hardware = Hardware("server_tag") start_status = env.start_up(env_params["host"], env_params["port"]) elif env_mode == "helm": helm_params = env_params["helm_params"] @@ -106,8 +99,8 @@ def run_suite(run_type, suite, env_mode, env_params): case_metric.update_message(err_message) suite_status = False logger.debug(case_metric.metrics) - # if env_mode == "helm": - api.save(case_metric) + if deploy_mode: + api.save(case_metric) if suite_status: metric.update_status(status="RUN_SUCC") else: @@ -120,7 +113,8 @@ def run_suite(run_type, suite, env_mode, env_params): logger.error(traceback.format_exc()) metric.update_status(status="RUN_FAILED") finally: - api.save(metric) + if deploy_mode: + api.save(metric) # time.sleep(10) env.tear_down() if metric.status != "RUN_SUCC": @@ -161,6 +155,11 @@ def main(): metavar='FILE', help='load test suite from FILE', default='') + arg_parser.add_argument( + '--server-config', + metavar='FILE', + help='load server config from FILE', + default='') args = arg_parser.parse_args() @@ -216,10 +215,20 @@ def main(): elif args.local: # for local mode + deploy_params = args.server_config + deploy_params_dict = None + if deploy_params: + with open(deploy_params) as f: + deploy_params_dict = full_load(f) + f.close() + logger.debug(deploy_params_dict) + deploy_mode = utils.get_deploy_mode(deploy_params_dict) + server_tag = utils.get_server_tag(deploy_params_dict) env_params = { "host": args.host, "port": args.port, - "deploy_mode": None + "deploy_mode": deploy_mode, + "server_tag": server_tag } suite_file = args.suite with open(suite_file) as f: diff --git a/tests/milvus_benchmark/milvus_benchmark/metrics/models/metric.py b/tests/milvus_benchmark/milvus_benchmark/metrics/models/metric.py index b62e3f28c9..ea38009bb0 100644 --- a/tests/milvus_benchmark/milvus_benchmark/metrics/models/metric.py +++ b/tests/milvus_benchmark/milvus_benchmark/metrics/models/metric.py @@ -34,6 +34,10 @@ class Metric(object): def set_mode(self, mode): self.mode = mode + # including: metric, suite_metric + def set_case_metric_type(self): + self._type = "case" + def json_md5(self): json_str = json.dumps(vars(self), sort_keys=True) return hashlib.md5(json_str.encode('utf-8')).hexdigest() diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/accuracy.py b/tests/milvus_benchmark/milvus_benchmark/runners/accuracy.py index bb7cf21458..a56ebcfc60 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/accuracy.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/accuracy.py @@ -62,6 +62,7 @@ class AccuracyRunner(BaseRunner): "params": search_param} # TODO: only update search_info case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metric.search = { "nq": nq, "topk": top_k, @@ -168,6 +169,7 @@ class AccAccuracyRunner(AccuracyRunner): "params": search_param} # TODO: only update search_info case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metric.index = index_info case_metric.search = { "nq": nq, @@ -245,7 +247,7 @@ class AccAccuracyRunner(AccuracyRunner): logger.info(self.milvus.describe_index(index_field_name)) logger.info("Start load collection: %s" % collection_name) # self.milvus.release_collection() - self.milvus.load_collection() + self.milvus.load_collection(timeout=600) logger.info("End load collection: %s" % collection_name) def run_case(self, case_metric, **case_param): diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/base.py b/tests/milvus_benchmark/milvus_benchmark/runners/base.py index f86e094fc4..e0ac1176e4 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/base.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/base.py @@ -28,7 +28,8 @@ class BaseRunner(object): pass def stop(self): - logger.debug("Start clean up env: {} in runner".format(self.env.name)) + logger.debug("Stop runner...") + pass @property def hostname(self): @@ -54,10 +55,11 @@ class BaseRunner(object): def run_as_group(self): return self._run_as_group - def init_metric(self, name, collection_info=None, index_info=None, search_info=None, run_params=None): + def init_metric(self, name, collection_info=None, index_info=None, search_info=None, run_params=None, t="metric"): self._metric.collection = collection_info self._metric.index = index_info self._metric.search = search_info + self._metric.type = t self._metric.run_params = run_params self._metric.metrics = { "type": name, diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/build.py b/tests/milvus_benchmark/milvus_benchmark/runners/build.py index 557ee76c40..7d4fb8d804 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/build.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/build.py @@ -41,6 +41,7 @@ class BuildRunner(BaseRunner): flush = False self.init_metric(self.name, collection_info, index_info, search_info=None) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics = list() case_params = list() case_metrics.append(case_metric) diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/chaos.py b/tests/milvus_benchmark/milvus_benchmark/runners/chaos.py index 70f6f2f9de..f5224e2ffb 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/chaos.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/chaos.py @@ -71,6 +71,7 @@ class SimpleChaosRunner(BaseRunner): }] self.init_metric(self.name, {}, {}, None) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics.append(case_metric) return case_params, case_metrics diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/get.py b/tests/milvus_benchmark/milvus_benchmark/runners/get.py index 0141a6447d..10e2b79875 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/get.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/get.py @@ -53,6 +53,7 @@ class GetRunner(BaseRunner): for ids_length in ids_length_list: ids = get_ids(ids_length, collection_size) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_params = list() case_metric.run_params = {"ids_length": ids_length} case_metrics.append(case_metric) diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/insert.py b/tests/milvus_benchmark/milvus_benchmark/runners/insert.py index fcbf14e18c..0f5119e197 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/insert.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/insert.py @@ -48,6 +48,7 @@ class InsertRunner(BaseRunner): flush = False self.init_metric(self.name, collection_info, index_info, None) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics = list() case_params = list() case_metrics.append(case_metric) diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/locust.py b/tests/milvus_benchmark/milvus_benchmark/runners/locust.py index 482834756d..1ebfc4c6e5 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/locust.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/locust.py @@ -21,11 +21,10 @@ class LocustRunner(BaseRunner): connection_type = case_param["connection_type"] # spawn locust requests - clients_num = task["clients_num"] - hatch_rate = task["hatch_rate"] - during_time = utils.timestr_to_int(task["during_time"]) + task["during_time"] = utils.timestr_to_int(task["during_time"]) task_types = task["types"] - run_params = {"tasks": {}, "clients_num": clients_num, "spawn_rate": hatch_rate, "during_time": during_time} + run_params = {"tasks": {}} + run_params.update(task) info_in_params = { "index_field_name": case_param["index_field_name"], "vector_field_name": case_param["vector_field_name"], @@ -95,6 +94,7 @@ class LocustInsertRunner(LocustRunner): } self.init_metric(self.name, collection_info, index_info, None, run_params) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics = list() case_params = list() case_metrics.append(case_metric) @@ -197,6 +197,7 @@ class LocustSearchRunner(LocustRunner): } self.init_metric(self.name, collection_info, index_info, None, run_params) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics = list() case_params = list() case_metrics.append(case_metric) @@ -324,6 +325,7 @@ class LocustRandomRunner(LocustRunner): } self.init_metric(self.name, collection_info, index_info, None, run_params) case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metrics = list() case_params = list() case_metrics.append(case_metric) @@ -356,40 +358,40 @@ class LocustRandomRunner(LocustRunner): build_index = case_param["build_index"] self.milvus.set_collection(collection_name) - # if self.milvus.exists_collection(): - # logger.debug("Start drop collection") - # self.milvus.drop() - # time.sleep(runner_utils.DELETE_INTERVAL_TIME) - # self.milvus.create_collection(dimension, data_type=vector_type, - # other_fields=other_fields) - # # TODO: update fields in collection_info - # # fields = self.get_fields(self.milvus, collection_name) - # # collection_info = { - # # "dimension": dimension, - # # "metric_type": metric_type, - # # "dataset_name": collection_name, - # # "fields": fields - # # } - # if build_index is True: - # if case_param["index_type"]: - # self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"]) - # logger.debug(self.milvus.describe_index(index_field_name)) - # else: - # build_index = False - # logger.warning("Please specify the index_type") - # self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"]) - # build_time = 0.0 - # start_time = time.time() - # self.milvus.flush() - # flush_time = round(time.time()-start_time, 2) - # logger.debug(self.milvus.count()) - # if build_index is True: - # logger.debug("Start build index for last file") - # start_time = time.time() - # self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"]) - # build_time = round(time.time()-start_time, 2) - # logger.debug({"flush_time": flush_time, "build_time": build_time}) - # logger.info(self.milvus.count()) + if self.milvus.exists_collection(): + logger.debug("Start drop collection") + self.milvus.drop() + time.sleep(runner_utils.DELETE_INTERVAL_TIME) + self.milvus.create_collection(dimension, data_type=vector_type, + other_fields=other_fields) + # TODO: update fields in collection_info + # fields = self.get_fields(self.milvus, collection_name) + # collection_info = { + # "dimension": dimension, + # "metric_type": metric_type, + # "dataset_name": collection_name, + # "fields": fields + # } + if build_index is True: + if case_param["index_type"]: + self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"]) + logger.debug(self.milvus.describe_index(index_field_name)) + else: + build_index = False + logger.warning("Please specify the index_type") + self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"]) + build_time = 0.0 + start_time = time.time() + self.milvus.flush() + flush_time = round(time.time()-start_time, 2) + logger.debug(self.milvus.count()) + if build_index is True: + logger.debug("Start build index for last file") + start_time = time.time() + self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"]) + build_time = round(time.time()-start_time, 2) + logger.debug({"flush_time": flush_time, "build_time": build_time}) + logger.info(self.milvus.count()) logger.info("Start load collection") load_start_time = time.time() self.milvus.load_collection() diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/locust_tasks.py b/tests/milvus_benchmark/milvus_benchmark/runners/locust_tasks.py index 37f838c840..1f18c580d4 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/locust_tasks.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/locust_tasks.py @@ -2,23 +2,21 @@ import pdb import random import time import logging -import json +import math from locust import TaskSet, task from . import utils -dim = 128 - logger = logging.getLogger("milvus_benchmark.runners.locust_tasks") class Tasks(TaskSet): - @task(100) + @task def query(self): op = "query" - X = utils.generate_vectors(self.params[op]["nq"], self.op_info["dimension"]) + # X = utils.generate_vectors(self.params[op]["nq"], self.op_info["dimension"]) vector_query = {"vector": {self.op_info["vector_field_name"]: { "topk": self.params[op]["top_k"], - "query": X, + "query": self.values["X"][:self.params[op]["nq"]], "metric_type": self.params[op]["metric_type"] if "metric_type" in self.params[op] else utils.DEFAULT_METRIC_TYPE, "params": self.params[op]["search_param"]} }} @@ -29,12 +27,11 @@ class Tasks(TaskSet): filter_query.append(eval(filter["range"])) if isinstance(filter, dict) and "term" in filter: filter_query.append(eval(filter["term"])) - logger.debug(filter_query) - self.client.query(vector_query, filter_query=filter_query, log=False) + # logger.debug(filter_query) + self.client.query(vector_query, filter_query=filter_query, log=False, timeout=120) @task def flush(self): - logger.debug("Flush") self.client.flush(log=False) @task @@ -57,11 +54,20 @@ class Tasks(TaskSet): @task def insert(self): op = "insert" - ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ni_per"])] - X = [[random.random() for _ in range(dim)] for _ in range(self.params[op]["ni_per"])] - entities = utils.generate_entities(self.op_info["collection_info"], X, ids) + # ids = [random.randint(1000000, 10000000) for _ in range(self.params[op]["ni_per"])] + # X = [[random.random() for _ in range(self.op_info["dimension"])] for _ in range(self.params[op]["ni_per"])] + entities = utils.generate_entities(self.op_info["collection_info"], self.values["X"][:self.params[op]["ni_per"]], self.values["ids"][:self.params[op]["ni_per"]]) self.client.insert(entities, log=False) + @task + def insert_flush(self): + op = "insert_flush" + # ids = [random.randint(1000000, 10000000) for _ in range(self.params[op]["ni_per"])] + # X = [[random.random() for _ in range(self.op_info["dimension"])] for _ in range(self.params[op]["ni_per"])] + entities = utils.generate_entities(self.op_info["collection_info"], self.values["X"][:self.params[op]["ni_per"]], self.values["ids"][:self.params[op]["ni_per"]]) + self.client.insert(entities, log=False) + self.client.flush(log=False) + @task def insert_rand(self): self.client.insert_rand(log=False) @@ -69,5 +75,5 @@ class Tasks(TaskSet): @task def get(self): op = "get" - ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ids_length"])] - self.client.get(ids) + # ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ids_length"])] + self.client.get(self.values["get_ids"][:self.params[op]["ids_length"]]) diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/locust_user.py b/tests/milvus_benchmark/milvus_benchmark/runners/locust_user.py index 982a853f87..16ee4fa1e8 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/locust_user.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/locust_user.py @@ -4,22 +4,54 @@ import pdb import gevent # import gevent.monkey # gevent.monkey.patch_all() -from locust import Locust, User, TaskSet, task, between, events, stats +from locust import User, between, events, stats from locust.env import Environment import locust.stats +import math +from locust import LoadTestShape from locust.stats import stats_printer, print_stats from locust.log import setup_logging, greenlet_exception_logger from milvus_benchmark.client import MilvusClient from .locust_task import MilvusTask from .locust_tasks import Tasks +from . import utils -locust.stats.CONSOLE_STATS_INTERVAL_SEC = 30 +locust.stats.CONSOLE_STATS_INTERVAL_SEC = 20 logger = logging.getLogger("milvus_benchmark.runners.locust_user") +nq = 10000 +nb = 100000 + + +class StepLoadShape(LoadTestShape): + """ + A step load shape + Keyword arguments: + step_time -- Time between steps + step_load -- User increase amount at each step + spawn_rate -- Users to stop/start per second at every step + time_limit -- Time limit in seconds + """ + + def init(self, step_time, step_load, spawn_rate, time_limit): + self.step_time = step_time + self.step_load = step_load + self.spawn_rate = spawn_rate + self.time_limit = time_limit + + def tick(self): + run_time = self.get_run_time() + + if run_time > self.time_limit: + return None + + current_step = math.floor(run_time / self.step_time) + 1 + return (current_step * self.step_load, self.spawn_rate) class MyUser(User): # task_set = None - wait_time = between(0.001, 0.002) + # wait_time = between(0.001, 0.002) + pass def locust_executor(host, port, collection_name, connection_type="single", run_params=None): @@ -33,14 +65,24 @@ def locust_executor(host, port, collection_name, connection_type="single", run_p MyUser.tasks.update(task) MyUser.params[op] = value["params"] if "params" in value else None logger.info(MyUser.tasks) + MyUser.values = { + "ids": [random.randint(1000000, 10000000) for _ in range(nb)], + "get_ids": [random.randint(1, 10000000) for _ in range(nb)], + "X": utils.generate_vectors(nq, MyUser.op_info["dimension"]) + } - MyUser.tasks = {Tasks.load: 1, Tasks.flush: 1} + # MyUser.tasks = {Tasks.query: 1, Tasks.flush: 1} MyUser.client = MilvusTask(host=host, port=port, collection_name=collection_name, connection_type=connection_type, m=m) - # MyUser.info = m.get_info(collection_name) - env = Environment(events=events, user_classes=[MyUser]) - - runner = env.create_local_runner() + if "load_shape" in run_params and run_params["load_shape"]: + test = StepLoadShape() + test.init(run_params["step_time"], run_params["step_load"], run_params["spawn_rate"], run_params["during_time"]) + env = Environment(events=events, user_classes=[MyUser], shape_class=test) + runner = env.create_local_runner() + env.runner.start_shape() + else: + env = Environment(events=events, user_classes=[MyUser]) + runner = env.create_local_runner() # setup logging # setup_logging("WARNING", "/dev/null") # greenlet_exception_logger(logger=logger) @@ -48,7 +90,9 @@ def locust_executor(host, port, collection_name, connection_type="single", run_p # env.create_web_ui("127.0.0.1", 8089) # gevent.spawn(stats_printer(env.stats), env, "test", full_history=True) # events.init.fire(environment=env, runner=runner) - clients_num = run_params["clients_num"] + clients_num = run_params["clients_num"] if "clients_num" in run_params else 0 + step_load = run_params["step_load"] if "step_load" in run_params else 0 + step_time = run_params["step_time"] if "step_time" in run_params else 0 spawn_rate = run_params["spawn_rate"] during_time = run_params["during_time"] runner.start(clients_num, spawn_rate=spawn_rate) diff --git a/tests/milvus_benchmark/milvus_benchmark/runners/search.py b/tests/milvus_benchmark/milvus_benchmark/runners/search.py index 7ecb72d6e1..03a0f1875f 100644 --- a/tests/milvus_benchmark/milvus_benchmark/runners/search.py +++ b/tests/milvus_benchmark/milvus_benchmark/runners/search.py @@ -69,6 +69,7 @@ class SearchRunner(BaseRunner): "params": search_param} # TODO: only update search_info case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metric.search = { "nq": nq, "topk": top_k, @@ -161,18 +162,20 @@ class InsertSearchRunner(BaseRunner): cases = list() case_metrics = list() self.init_metric(self.name, collection_info, index_info, None) + for search_param in search_params: if not filters: filters.append(None) for filter in filters: - filter_param = [] + # filter_param = [] + filter_query = [] if isinstance(filter, dict) and "range" in filter: filter_query.append(eval(filter["range"])) - filter_param.append(filter["range"]) + # filter_param.append(filter["range"]) if isinstance(filter, dict) and "term" in filter: filter_query.append(eval(filter["term"])) - filter_param.append(filter["term"]) - logger.info("filter param: %s" % json.dumps(filter_param)) + # filter_param.append(filter["term"]) + # logger.info("filter param: %s" % json.dumps(filter_param)) for nq in nqs: query_vectors = base_query_vectors[0:nq] for top_k in top_ks: @@ -183,11 +186,12 @@ class InsertSearchRunner(BaseRunner): "params": search_param} # TODO: only update search_info case_metric = copy.deepcopy(self.metric) + case_metric.set_case_metric_type() case_metric.search = { "nq": nq, "topk": top_k, "search_param": search_param, - "filter": filter_param + "filter": filter_query } vector_query = {"vector": {index_field_name: search_info}} case = { diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_insert_data.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_insert_data.yaml index 6bee750f78..c5ac542069 100644 --- a/tests/milvus_benchmark/milvus_benchmark/suites/2_insert_data.yaml +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_insert_data.yaml @@ -2,18 +2,7 @@ insert_performance: collections: - milvus: - db_config.primary_path: /test/milvus/db_data_011/cluster/sift_1m_128_l2 - cache_config.cpu_cache_capacity: 4GB - engine_config.use_blas_threshold: 1100 - engine_config.gpu_search_threshold: 1 - gpu_resource_config.enable: true - gpu_resource_config.cache_capacity: 4GB - gpu_resource_config.search_resources: - - gpu0 - - gpu1 - gpu_resource_config.build_index_resources: - - gpu0 - - gpu1 + db_config.primary_path: /test/milvus/db_data_2/cluster/sift_1m_128_l2 wal_enable: true collection_name: sift_1m_128_l2 # other_fields: int,float diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert.yaml index 0f763d0c95..744e9606e6 100644 --- a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert.yaml +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert.yaml @@ -3,27 +3,19 @@ locust_insert_performance: - milvus: db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2 - cache_config.cpu_cache_capacity: 8GB - cache_config.insert_buffer_size: 2GB - engine_config.use_blas_threshold: 1100 - engine_config.gpu_search_threshold: 1 - gpu_resource_config.enable: false - gpu_resource_config.cache_capacity: 4GB - gpu_resource_config.search_resources: - - gpu0 - gpu_resource_config.build_index_resources: - - gpu0 - wal_enable: true - collection_name: sift_1m_128_l2 + collection_name: local_1m_128_l2 ni_per: 50000 build_index: false index_type: ivf_sq8 index_param: nlist: 1024 task: + load_shape: false + step_time: 100 + step_load: 50 + spawn_rate: 2 connection_num: 1 clients_num: 100 - hatch_rate: 2 during_time: 600 types: - diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert_flush.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert_flush.yaml new file mode 100644 index 0000000000..d798e85950 --- /dev/null +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_insert_flush.yaml @@ -0,0 +1,25 @@ +locust_insert_performance: + collections: + - + milvus: + db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2 + collection_name: local_1m_128_l2 + ni_per: 50000 + build_index: false + index_type: ivf_sq8 + index_param: + nlist: 1024 + task: + load_shape: false + step_time: 100 + step_load: 50 + spawn_rate: 2 + connection_num: 1 + clients_num: 100 + during_time: 600 + types: + - + type: insert_flush + weight: 1 + params: + ni_per: 1 diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert.yaml new file mode 100644 index 0000000000..98f8991e31 --- /dev/null +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert.yaml @@ -0,0 +1,25 @@ +locust_insert_performance: + collections: + - + milvus: + db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2 + collection_name: local_1m_128_l2 + ni_per: 50000 + build_index: false + index_type: ivf_sq8 + index_param: + nlist: 1024 + task: + load_shape: true + step_time: 100 + step_load: 50 + spawn_rate: 50 + connection_num: 1 + clients_num: 100 + during_time: 600 + types: + - + type: insert + weight: 1 + params: + ni_per: 1 \ No newline at end of file diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert_flush.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert_flush.yaml new file mode 100644 index 0000000000..c7314ea657 --- /dev/null +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_load_insert_flush.yaml @@ -0,0 +1,25 @@ +locust_insert_performance: + collections: + - + milvus: + db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2 + collection_name: local_1m_128_l2 + ni_per: 50000 + build_index: false + index_type: ivf_sq8 + index_param: + nlist: 1024 + task: + load_shape: true + step_time: 100 + step_load: 50 + spawn_rate: 50 + connection_num: 1 + clients_num: 100 + during_time: 600 + types: + - + type: insert_flush + weight: 1 + params: + ni_per: 1 \ No newline at end of file diff --git a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_random.yaml b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_random.yaml index 4618cbcc61..324214c4c5 100644 --- a/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_random.yaml +++ b/tests/milvus_benchmark/milvus_benchmark/suites/2_locust_random.yaml @@ -29,6 +29,19 @@ locust_random_performance: # GT: 1000000 search_param: nprobe: 16 + - + type: insert + weight: 20 + params: + ni_per: 1 + - + type: load + weight: 1 + - + type: get + weight: 2 + params: + ids_length: 10 connection_num: 1 clients_num: 20 hatch_rate: 2 diff --git a/tests/milvus_benchmark/milvus_benchmark/update.py b/tests/milvus_benchmark/milvus_benchmark/update.py index cc3a6e90f8..78f08e5dde 100644 --- a/tests/milvus_benchmark/milvus_benchmark/update.py +++ b/tests/milvus_benchmark/milvus_benchmark/update.py @@ -1,16 +1,11 @@ -import os import sys -import time import re import logging import traceback import argparse from yaml import full_load, dump - - -DEFUALT_DEPLOY_MODE = "single" -IDC_NAS_URL = "//172.16.70.249/test" -MINIO_HOST = "minio-test.qa.svc.cluster.local" +import config +import utils def parse_server_tag(server_tag): @@ -48,21 +43,18 @@ def update_values(src_values_file, deploy_params_file): except Exception as e: logging.error(str(e)) raise Exception("File not found") - deploy_mode = deploy_params["deploy_mode"] if "deploy_mode" in deploy_params else DEFUALT_DEPLOY_MODE + deploy_mode = utils.get_deploy_mode(deploy_params) + print(deploy_mode) cluster = False values_dict["service"]["type"] = "ClusterIP" - if deploy_mode != DEFUALT_DEPLOY_MODE: + if deploy_mode != config.DEFUALT_DEPLOY_MODE: cluster = True values_dict["cluster"]["enabled"] = True - if "server" in deploy_params: - server = deploy_params["server"] - server_name = server["server_name"] if "server_name" in server else "" - server_tag = server["server_tag"] if "server_tag" in server else "" - else: - raise Exception("No server specified in {}".format(deploy_params_file)) + server_tag = utils.get_server_tag(deploy_params) + print(server_tag) # TODO: update milvus config # # update values.yaml with the given host - node_config = None + # node_config = None perf_tolerations = [{ "key": "node-role.kubernetes.io/benchmark", "operator": "Exists", @@ -92,6 +84,20 @@ def update_values(src_values_file, deploy_params_file): # "cpu": str(int(cpus) - 1) + ".0" } } + # use external minio/s3 + + # TODO: disable temp + # values_dict['minio']['enabled'] = False + values_dict['minio']['enabled'] = True + # values_dict["externalS3"]["enabled"] = True + values_dict["externalS3"]["enabled"] = False + values_dict["externalS3"]["host"] = config.MINIO_HOST + values_dict["externalS3"]["port"] = config.MINIO_PORT + values_dict["externalS3"]["accessKey"] = config.MINIO_ACCESS_KEY + values_dict["externalS3"]["secretKey"] = config.MINIO_SECRET_KEY + values_dict["externalS3"]["bucketName"] = config.MINIO_BUCKET_NAME + logging.debug(values_dict["externalS3"]) + if cluster is False: # TODO: support pod affinity for standalone mode if cpus: @@ -109,13 +115,6 @@ def update_values(src_values_file, deploy_params_file): values_dict['standalone']['tolerations'] = perf_tolerations # values_dict['minio']['tolerations'] = perf_tolerations values_dict['etcd']['tolerations'] = perf_tolerations - values_dict['minio']['enabled'] = False - # use external minio/s3 - values_dict["externalS3"]["enabled"] = True - values_dict["externalS3"]["host"] = MINIO_HOST - values_dict["externalS3"]["accessKey"] = "minioadmin" - values_dict["externalS3"]["secretKey"] = "minioadmin" - else: # TODO: mem limits on distributed mode # values_dict['pulsar']["broker"]["configData"].update({"maxMessageSize": "52428800", "PULSAR_MEM": BOOKKEEPER_PULSAR_MEM}) @@ -126,9 +125,9 @@ def update_values(src_values_file, deploy_params_file): # values_dict['etcd']['nodeSelector'] = node_config # # set limit/request cpus in resources # values_dict['proxy']['resources'] = resources - values_dict['querynode']['resources'] = resources - values_dict['indexnode']['resources'] = resources - values_dict['datanode']['resources'] = resources + values_dict['queryNode']['resources'] = resources + values_dict['indexNode']['resources'] = resources + values_dict['dataNode']['resources'] = resources # values_dict['minio']['resources'] = resources # values_dict['pulsarStandalone']['resources'] = resources if mems: @@ -143,9 +142,9 @@ def update_values(src_values_file, deploy_params_file): logging.debug("Add tolerations into cluster server") values_dict['proxy']['tolerations'] = perf_tolerations - values_dict['querynode']['tolerations'] = perf_tolerations - values_dict['indexnode']['tolerations'] = perf_tolerations - values_dict['datanode']['tolerations'] = perf_tolerations + values_dict['queryNode']['tolerations'] = perf_tolerations + values_dict['indexNode']['tolerations'] = perf_tolerations + values_dict['dataNode']['tolerations'] = perf_tolerations values_dict['etcd']['tolerations'] = perf_tolerations # values_dict['minio']['tolerations'] = perf_tolerations values_dict['pulsarStandalone']['tolerations'] = perf_tolerations @@ -155,12 +154,19 @@ def update_values(src_values_file, deploy_params_file): # values_dict['pulsar']['broker']['tolerations'] = perf_tolerations # values_dict['pulsar']['bookkeeper']['tolerations'] = perf_tolerations # values_dict['pulsar']['zookeeper']['tolerations'] = perf_tolerations - values_dict['minio']['enabled'] = False - # use external minio/s3 - values_dict["externalS3"]["enabled"] = True - values_dict["externalS3"]["host"] = MINIO_HOST - values_dict["externalS3"]["accessKey"] = "minioadmin" - values_dict["externalS3"]["secretKey"] = "minioadmin" + milvus_params = deploy_params["milvus"] + if "datanode" in milvus_params: + if "replicas" in milvus_params["datanode"]: + values_dict['dataNode']["replicas"] = milvus_params["datanode"]["replicas"] + if "querynode"in milvus_params: + if "replicas" in milvus_params["querynode"]: + values_dict['queryNode']["replicas"] = milvus_params["querynode"]["replicas"] + if "indexnode"in milvus_params: + if "replicas" in milvus_params["indexnode"]: + values_dict['indexNode']["replicas"] = milvus_params["indexnode"]["replicas"] + if "proxy"in milvus_params: + if "replicas" in milvus_params["proxy"]: + values_dict['proxy']["replicas"] = milvus_params["proxy"]["replicas"] # add extra volumes values_dict['extraVolumes'] = [{ 'name': 'test', @@ -171,7 +177,7 @@ def update_values(src_values_file, deploy_params_file): 'name': "cifs-test-secret" }, 'options': { - 'networkPath': IDC_NAS_URL, + 'networkPath': config.IDC_NAS_URL, 'mountOptions': "vers=1.0" } } @@ -186,7 +192,6 @@ def update_values(src_values_file, deploy_params_file): f.close() - if __name__ == "__main__": arg_parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) diff --git a/tests/milvus_benchmark/milvus_benchmark/utils.py b/tests/milvus_benchmark/milvus_benchmark/utils.py index c880ef381d..8b0f20a0c9 100644 --- a/tests/milvus_benchmark/milvus_benchmark/utils.py +++ b/tests/milvus_benchmark/milvus_benchmark/utils.py @@ -1,25 +1,13 @@ # -*- coding: utf-8 -*- -import os -import sys -import pdb import time -import json -import datetime -import argparse -import threading import logging import string import random -# import multiprocessing -import numpy as np -# import psutil -import sklearn.preprocessing -# import docker from yaml import full_load, dump import yaml import tableprint as tp from pprint import pprint -from pymilvus import DataType +import config logger = logging.getLogger("milvus_benchmark.utils") @@ -122,63 +110,25 @@ def print_table(headers, columns, data): tp.table(bodys, headers) -def modify_config(k, v, type=None, file_path="conf/server_config.yaml", db_slave=None): - if not os.path.isfile(file_path): - raise Exception('File: %s not found' % file_path) - with open(file_path) as f: - config_dict = full_load(f) - f.close() - if config_dict: - if k.find("use_blas_threshold") != -1: - config_dict['engine_config']['use_blas_threshold'] = int(v) - elif k.find("use_gpu_threshold") != -1: - config_dict['engine_config']['gpu_search_threshold'] = int(v) - elif k.find("cpu_cache_capacity") != -1: - config_dict['cache_config']['cpu_cache_capacity'] = int(v) - elif k.find("enable_gpu") != -1: - config_dict['gpu_resource_config']['enable'] = v - elif k.find("gpu_cache_capacity") != -1: - config_dict['gpu_resource_config']['cache_capacity'] = int(v) - elif k.find("index_build_device") != -1: - config_dict['gpu_resource_config']['build_index_resources'] = v - elif k.find("search_resources") != -1: - config_dict['resource_config']['resources'] = v - - # if db_slave: - # config_dict['db_config']['db_slave_path'] = MULTI_DB_SLAVE_PATH - with open(file_path, 'w') as f: - dump(config_dict, f, default_flow_style=False) - f.close() - else: - raise Exception('Load file:%s error' % file_path) +def get_deploy_mode(deploy_params): + deploy_mode = None + if deploy_params: + milvus_params = None + if "milvus" in deploy_params: + milvus_params = deploy_params["milvus"] + if not milvus_params: + deploy_mode = config.DEFUALT_DEPLOY_MODE + elif "deploy_mode" in milvus_params: + deploy_mode = milvus_params["deploy_mode"] + if deploy_mode not in [config.SINGLE_DEPLOY_MODE, config.CLUSTER_DEPLOY_MODE]: + raise Exception("Invalid deploy mode: %s" % deploy_mode) + return deploy_mode -# update server_config.yaml -def update_server_config(file_path, server_config): - if not os.path.isfile(file_path): - raise Exception('File: %s not found' % file_path) - with open(file_path) as f: - values_dict = full_load(f) - f.close() - for k, v in server_config.items(): - if k.find("primary_path") != -1: - values_dict["db_config"]["primary_path"] = v - elif k.find("use_blas_threshold") != -1: - values_dict['engine_config']['use_blas_threshold'] = int(v) - elif k.find("gpu_search_threshold") != -1: - values_dict['engine_config']['gpu_search_threshold'] = int(v) - elif k.find("cpu_cache_capacity") != -1: - values_dict['cache_config']['cpu_cache_capacity'] = int(v) - elif k.find("cache_insert_data") != -1: - values_dict['cache_config']['cache_insert_data'] = v - elif k.find("enable") != -1: - values_dict['gpu_resource_config']['enable'] = v - elif k.find("gpu_cache_capacity") != -1: - values_dict['gpu_resource_config']['cache_capacity'] = int(v) - elif k.find("build_index_resources") != -1: - values_dict['gpu_resource_config']['build_index_resources'] = v - elif k.find("search_resources") != -1: - values_dict['gpu_resource_config']['search_resources'] = v - with open(file_path, 'w') as f: - dump(values_dict, f, default_flow_style=False) - f.close() +def get_server_tag(deploy_params): + server_tag = "" + if deploy_params and "server" in deploy_params: + server = deploy_params["server"] + # server_name = server["server_name"] if "server_name" in server else "" + server_tag = server["server_tag"] if "server_tag" in server else "" + return server_tag \ No newline at end of file diff --git a/tests/milvus_benchmark/requirements.txt b/tests/milvus_benchmark/requirements.txt index ab31d01f42..8fa17d7f7b 100644 --- a/tests/milvus_benchmark/requirements.txt +++ b/tests/milvus_benchmark/requirements.txt @@ -1,7 +1,7 @@ # pymilvus==0.2.14 # pymilvus-distributed>=0.0.61 --extra-index-url https://test.pypi.org/simple/ -pymilvus==2.0.0rc2.dev4 +pymilvus==2.0.0rc2.dev12 scipy==1.3.1 scikit-learn==0.19.1