diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index e0c9665c1c..e5b38b0494 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -143,7 +143,7 @@ class ApiUtilityWrapper: pending_tasks = self.get_bulk_insert_pending_list() log.info(f"after waiting, there are {len(pending_tasks)} pending tasks") log.info(f"task state distribution: {tasks_state_distribution}") - log.debug(tasks_state) + log.info(tasks_state) if len(tasks_state_distribution["success"]) == len(task_ids): log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}") return True, tasks_state diff --git a/tests/python_client/bulk_insert/conftest.py b/tests/python_client/bulk_insert/conftest.py new file mode 100644 index 0000000000..cad760a0f8 --- /dev/null +++ b/tests/python_client/bulk_insert/conftest.py @@ -0,0 +1,25 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption("--file_type", action="store", default="json", help="filetype") + parser.addoption("--create_index", action="store", default="create_index", help="whether creating index") + parser.addoption("--nb", action="store", default="", help="nb") + parser.addoption("--dim", action="store", default="2048", help="whether creating index") + +@pytest.fixture +def file_type(request): + return request.config.getoption("--file_type") + + +@pytest.fixture +def create_index(request): + return request.config.getoption("--create_index") + +@pytest.fixture +def nb(request): + return request.config.getoption("--nb") + +@pytest.fixture +def dim(request): + return request.config.getoption("--dim") \ No newline at end of file diff --git a/tests/python_client/bulk_insert/test_bulk_insert_perf.py b/tests/python_client/bulk_insert/test_bulk_insert_perf.py index 29ffdebc18..4260ef5b8f 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_perf.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_perf.py @@ -1,5 +1,8 @@ import pytest -import json +import os +import time +import threading +from pathlib import Path from time import sleep from minio import Minio from pymilvus import connections @@ -58,6 +61,11 @@ class TestChaosBase: class TestChaos(TestChaosBase): + + def teardown(self): + sleep(10) + log.info(f'Alive threads: {threading.enumerate()}') + @pytest.fixture(scope="function", autouse=True) def connection(self, host, port, milvus_ns): connections.add_connection(default={"host": host, "port": port}) @@ -74,17 +82,16 @@ class TestChaos(TestChaosBase): self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) - @pytest.fixture(scope="function", autouse=True) - def init_health_checkers(self, collection_name=None): + def init_health_checkers(self, collection_name=None, create_index=True, dim=2048): log.info("init health checkers") c_name = collection_name if collection_name else cf.gen_unique_str("Checker_") checkers = { - Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=True), + Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=False, + dim=dim, create_index=create_index), } self.health_checkers = checkers - @pytest.fixture(scope="function", autouse=True) - def prepare_bulk_insert(self, nb=100000): + def prepare_bulk_insert(self, nb=3000, file_type="json",dim=2048): if Op.bulk_insert not in self.health_checkers: log.info("bulk_insert checker is not in health checkers, skip prepare bulk load") return @@ -100,40 +107,53 @@ class TestChaos(TestChaosBase): minio_port = "9000" minio_endpoint = f"{minio_ip}:{minio_port}" bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] - schema = cf.gen_default_collection_schema() - data = cf.gen_default_list_data_for_bulk_insert(nb=nb) - fields_name = [field.name for field in schema.fields] - entities = [] - for i in range(nb): - entity_value = [field_values[i] for field_values in data] - entity = dict(zip(fields_name, entity_value)) - entities.append(entity) - data_dict = {"rows": entities} - data_source = "/tmp/ci_logs/bulk_insert_data_source.json" - file_name = "bulk_insert_data_source.json" - files = ["bulk_insert_data_source.json"] - # TODO: npy file type is not supported so far - log.info("generate bulk load file") - with open(data_source, "w") as f: - f.write(json.dumps(data_dict, indent=4)) + schema = cf.gen_default_collection_schema(dim=dim) + data = cf.gen_default_list_data_for_bulk_insert(nb=nb, dim=dim) + data_dir = "/tmp/bulk_insert_data" + Path(data_dir).mkdir(parents=True, exist_ok=True) + files = [] + if file_type == "json": + files = cf.gen_json_files_for_bulk_insert(data, schema, data_dir) + if file_type == "npy": + files = cf.gen_npy_files_for_bulk_insert(data, schema, data_dir) log.info("upload file to minio") client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False) - client.fput_object(bucket_name, file_name, data_source) + for file_name in files: + file_size = os.path.getsize(os.path.join(data_dir, file_name)) / 1024 / 1024 + t0 = time.time() + client.fput_object(bucket_name, file_name, os.path.join(data_dir, file_name)) + log.info(f"upload file {file_name} to minio, size: {file_size:.2f} MB, cost {time.time() - t0:.2f} s") self.health_checkers[Op.bulk_insert].update(schema=schema, files=files) log.info("prepare data for bulk load done") @pytest.mark.tags(CaseLabel.L3) - def test_bulk_insert(self): + def test_bulk_insert_perf(self, file_type, create_index, nb, dim): # start the monitor threads to check the milvus ops log.info("*********************Test Start**********************") log.info(connections.get_connection_addr('default')) - # c_name = cf.gen_unique_str("BulkInsertChecker_") - # self.init_health_checkers(collection_name=c_name) + log.info(f"file_type: {file_type}, create_index: {create_index}") + if create_index == "create_index": + create_index = True + else: + create_index = False + self.init_health_checkers(create_index=create_index, dim=int(dim)) + if nb=="None": + nb = 3000 + if file_type == "json": + nb = 13800 + if file_type == "npy": + nb = 65000 + else: + nb = int(nb) + self.prepare_bulk_insert(file_type=file_type, nb=nb, dim=int(dim)) cc.start_monitor_threads(self.health_checkers) # wait 600s - sleep(constants.WAIT_PER_OP * 60) + sleep(constants.WAIT_PER_OP * 30) assert_statistic(self.health_checkers) assert_expectations() + for k, checker in self.health_checkers.items(): + checker.check_result() + checker.terminate() log.info("*********************Test Completed**********************") diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 1c6d8415d8..4afbfb4fa8 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -88,7 +88,7 @@ class Checker: b. count operations and success rate """ - def __init__(self, collection_name=None, shards_num=2): + def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim): self._succ = 0 self._fail = 0 self._keep_running = True @@ -98,12 +98,12 @@ class Checker: c_name = collection_name if collection_name is not None else cf.gen_unique_str( 'Checker_') self.c_wrap.init_collection(name=c_name, - schema=cf.gen_default_collection_schema(), + schema=cf.gen_default_collection_schema(dim=dim), shards_num=shards_num, timeout=timeout, # active_trace=True, enable_traceback=enable_traceback) - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), + self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH, dim=dim), timeout=timeout, enable_traceback=enable_traceback) self.initial_entities = self.c_wrap.num_entities # do as a flush @@ -125,6 +125,7 @@ class Checker: checker_name = self.__class__.__name__ checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}" log.info(checkers_result) + log.info(f"{checker_name} rsp times: {self.rsp_times}") return checkers_result def terminate(self): @@ -579,18 +580,19 @@ class LoadBalanceChecker(Checker): class BulkInsertChecker(Checker): """check bulk load operations in a dependent thread""" - def __init__(self, collection_name=None, files=[], use_one_collection=False): + def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim, create_index=True): if collection_name is None: collection_name = cf.gen_unique_str("BulkInsertChecker_") - super().__init__(collection_name=collection_name) - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, - constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) - self.c_wrap.load() + super().__init__(collection_name=collection_name, dim=dim) + self.create_index = create_index + if self.create_index: + res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str( + 'index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) self.utility_wrap = ApiUtilityWrapper() self.schema = cf.gen_default_collection_schema() self.files = files @@ -610,7 +612,7 @@ class BulkInsertChecker(Checker): log.info(f"bulk insert collection name: {self.c_name}") task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name, files=self.files) - completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=60) + completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=120) return task_ids, completed @exception_handler() @@ -622,6 +624,14 @@ class BulkInsertChecker(Checker): else: self.c_name = cf.gen_unique_str("BulkInsertChecker_") self.c_wrap.init_collection(name=self.c_name, schema=self.schema) + if self.create_index: + res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str( + 'index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) # bulk insert data task_ids, completed = self.bulk_insert() if not completed: diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 70fa79191b..950dc6175e 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -2,6 +2,8 @@ import os import random import math import string +import json +from functools import singledispatch import numpy as np import pandas as pd from sklearn import preprocessing @@ -15,6 +17,17 @@ from customize.milvus_operator import MilvusOperator """" Methods of processing data """ +@singledispatch +def to_serializable(val): + """Used by default.""" + return str(val) + + +@to_serializable.register(np.float32) +def ts_float32(val): + """Used if *val* is an instance of numpy.float32.""" + return np.float64(val) + class ParamInfo: def __init__(self): self.param_host = "" @@ -321,13 +334,41 @@ def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim): def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, dim=ct.default_dim): int_values = [i for i in range(nb)] - float_values = [float(i) for i in range(nb)] + float_values = [np.float32(i) for i in range(nb)] string_values = [str(i) for i in range(nb)] float_vec_values = gen_vectors(nb, dim) data = [int_values, float_values, string_values, float_vec_values] return data +def gen_json_files_for_bulk_insert(data, schema, data_dir): + nb = len(data[0]) + fields_name = [field.name for field in schema.fields] + entities = [] + for i in range(nb): + entity_value = [field_values[i] for field_values in data] + entity = dict(zip(fields_name, entity_value)) + entities.append(entity) + data_dict = {"rows": entities} + file_name = "bulk_insert_data_source.json" + files = ["bulk_insert_data_source.json"] + data_source = os.path.join(data_dir, file_name) + with open(data_source, "w") as f: + f.write(json.dumps(data_dict, indent=4, default=to_serializable)) + return files + + +def gen_npy_files_for_bulk_insert(data, schema, data_dir): + fields_name = [field.name for field in schema.fields] + files = [] + for field in fields_name: + files.append(f"{field}.npy") + for i, file in enumerate(files): + data_source = os.path.join(data_dir, file) + np.save(data_source, np.array(data[i])) + return files + + def gen_default_tuple_data(nb=ct.default_nb, dim=ct.default_dim): int_values = [i for i in range(nb)] float_values = [np.float32(i) for i in range(nb)]