From 86274f70bd796e9ceac810f4e51ce77d385295e4 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Thu, 6 Jun 2024 15:57:58 +0800 Subject: [PATCH] test: improve concurrency and reduce import test execution time (#33356) improve concurrency and reduce import test execution time --------- Signed-off-by: zhuwenxing --- .../testcases/test_bulk_insert.py | 515 ++---------------- tests/scripts/ci_e2e_4am.sh | 14 +- 2 files changed, 52 insertions(+), 477 deletions(-) diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 65b22e0df3..2bc39b6c13 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -80,23 +80,25 @@ class TestBulkInsert(TestcaseBaseBulkInsert): 5. verify search successfully 6. verify query successfully """ - files = prepare_bulk_insert_json_files( + + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_new_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, is_row_based=is_row_based, rows=entities, dim=dim, auto_id=auto_id, - data_fields=default_vec_only_fields, + data_fields=data_fields, force=True, ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + schema = cf.gen_collection_schema(fields=fields) self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() @@ -120,7 +122,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # verify imported data is available for search index_params = ct.default_index self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params + field_name=df.float_vec_field, index_params=index_params ) time.sleep(2) self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) @@ -139,7 +141,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): search_params = ct.default_search_params res, _ = self.collection_wrap.search( search_data, - df.vec_field, + df.float_vec_field, param=search_params, limit=topk, check_task=CheckTasks.check_search_results, @@ -167,7 +169,16 @@ class TestBulkInsert(TestcaseBaseBulkInsert): """ auto_id = False # no auto id for string_pk schema string_pk = True - files = prepare_bulk_insert_json_files( + + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [ + cf.gen_string_field(name=df.string_field, is_primary=True, auto_id=auto_id), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + ] + schema = cf.gen_collection_schema(fields=fields) + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_new_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, is_row_based=is_row_based, @@ -175,15 +186,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): dim=dim, auto_id=auto_id, str_pk=string_pk, - data_fields=default_vec_only_fields, + data_fields=data_fields, ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_string_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() @@ -205,7 +209,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # verify imported data is available for search index_params = ct.default_index self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params + field_name=df.float_vec_field, index_params=index_params ) self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) res, _ = self.utility_wrap.index_building_progress(c_name) @@ -224,7 +228,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): time.sleep(2) res, _ = self.collection_wrap.search( search_data, - df.vec_field, + df.float_vec_field, param=search_params, limit=topk, check_task=CheckTasks.check_search_results, @@ -232,7 +236,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) for hits in res: ids = hits.ids - expr = f"{df.pk_field} in {ids}" + expr = f"{df.string_field} in {ids}" expr = expr.replace("'", '"') results, _ = self.collection_wrap.query(expr=expr) assert len(results) == len(ids) @@ -433,21 +437,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): bulk_insert_row = 500 direct_insert_row = 3000 dim = 128 - files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=True, - rows=bulk_insert_row, - dim=dim, - data_fields=[df.pk_field, df.float_field, df.vec_field], - force=True, - ) self._connect() c_name = cf.gen_unique_str("bulk_insert") fields = [ cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_float_field(name=df.float_field), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), ] data = [ [i for i in range(direct_insert_row)], @@ -460,7 +455,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # build index index_params = ct.default_index self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params + field_name=df.float_vec_field, index_params=index_params ) # load collection self.collection_wrap.load() @@ -468,6 +463,16 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # insert data self.collection_wrap.insert(data) self.collection_wrap.num_entities + + files = prepare_bulk_insert_new_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + is_row_based=True, + rows=bulk_insert_row, + dim=dim, + data_fields=[df.pk_field, df.float_field, df.float_vec_field], + force=True, + ) # import data t0 = time.time() task_id, _ = self.utility_wrap.do_bulk_insert( @@ -503,7 +508,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): search_params = ct.default_search_params res, _ = self.collection_wrap.search( search_data, - df.vec_field, + df.float_vec_field, param=search_params, limit=topk, check_task=CheckTasks.check_search_results, @@ -738,91 +743,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert "name" in fields_from_search assert "address" in fields_from_search - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) # 128 - @pytest.mark.parametrize("entities", [1000]) # 1000 - @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_with_all_field_json(self, auto_id, dim, entities, enable_dynamic_field): - """ - collection schema 1: [pk, int64, float64, string float_vector] - data file: vectors.npy and uid.npy, - Steps: - 1. create collection - 2. import data - 3. verify - """ - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), - cf.gen_int64_field(name=df.int_field), - cf.gen_float_field(name=df.float_field), - cf.gen_string_field(name=df.string_field), - cf.gen_json_field(name=df.json_field), - cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), - cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), - cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100), - cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - ] - data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] - files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - rows=entities, - dim=dim, - data_fields=data_fields, - enable_dynamic_field=enable_dynamic_field, - force=True, - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) - self.collection_wrap.init_collection(c_name, schema=schema) - - # import data - t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=300 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt} with states:{states}") - assert success - num_entities = self.collection_wrap.num_entities - log.info(f" collection entities: {num_entities}") - assert num_entities == entities - # verify imported data is available for search - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - self.collection_wrap.load() - log.info(f"wait for load finished and be ready for search") - time.sleep(2) - # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") - search_data = cf.gen_vectors(1, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=1, - output_fields=["*"], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": 1}, - ) - for hit in res: - for r in hit: - fields_from_search = r.fields.keys() - for f in fields: - assert f.name in fields_from_search - if enable_dynamic_field: - assert "name" in fields_from_search - assert "address" in fields_from_search - @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) # 128 @@ -1791,362 +1711,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): num_entities += p.num_entities assert num_entities == entities * file_nums - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("partition_key_field", [df.int_field, df.string_field]) - @pytest.mark.skip("import data via csv is no longer supported") - def test_partition_key_on_csv_file(self, auto_id, partition_key_field): - """ - collection: auto_id, customized_id - collection schema: [pk, float_vector, int64, varchar, bool, float] - Step: - 1. create collection with partition key enabled - 2. import data - 3. verify the data entities equal the import data and distributed by values of partition key field - 4. load the collection - 5. verify search successfully - 6. verify query successfully - """ - dim = 12 - entities = 200 - files = prepare_bulk_insert_csv_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - rows=entities, - dim=dim, - auto_id=auto_id, - data_fields=default_multi_fields, - force=True - ) - self._connect() - c_name = cf.gen_unique_str("bulk_partition_key") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - cf.gen_int64_field(name=df.int_field, is_partition_key=(partition_key_field == df.int_field)), - cf.gen_string_field(name=df.string_field, is_partition_key=(partition_key_field == df.string_field)), - cf.gen_bool_field(name=df.bool_field), - cf.gen_float_field(name=df.float_field), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema, num_partitions=10) - assert len(self.collection_wrap.partitions) == 10 - # import data - t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, - partition_name=None, - files=files, - ) - logging.info(f"bulk insert task id:{task_id}") - success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=300 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - assert success - num_entities = self.collection_wrap.num_entities - log.info(f" collection entities: {num_entities}") - assert num_entities == entities - # verify imported data is available for search - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - self.collection_wrap.load() - log.info(f"wait for load finished and be ready for search") - time.sleep(10) - log.info( - f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" - ) - nq = 2 - topk = 2 - search_data = cf.gen_vectors(nq, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=topk, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": topk}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) - # verify data was bulk inserted into different partitions - num_entities = 0 - empty_partition_num = 0 - for p in self.collection_wrap.partitions: - if p.num_entities == 0: - empty_partition_num += 1 - num_entities += p.num_entities - assert num_entities == entities - - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) - @pytest.mark.parametrize("entities", [100]) - @pytest.mark.skip("import data via csv is no longer supported") - def test_float_vector_csv(self, auto_id, dim, entities): - """ - collection: auto_id, customized_id - collection schema: [pk, float_vector] - Steps: - 1. create collection - 2. import data - 3. verify the data entities equal the import data - 4. load the collection - 5. verify search successfully - 6. verify query successfully - """ - files = prepare_bulk_insert_csv_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - rows=entities, - dim=dim, - auto_id=auto_id, - data_fields=default_vec_only_fields, - force=True - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema) - # import data - t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, - partition_name=None, - files=files - ) - logging.info(f"bulk insert task id:{task_id}") - success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=300 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - num_entities = self.collection_wrap.num_entities - log.info(f"collection entities:{num_entities}") - assert num_entities == entities - - # verify imported data is available for search - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - time.sleep(2) - self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) - res, _ = self.utility_wrap.index_building_progress(c_name) - log.info(f"index building progress: {res}") - self.collection_wrap.load() - self.collection_wrap.load(_refresh=True) - log.info(f"wait for load finished and be ready for search") - time.sleep(2) - log.info( - f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" - ) - nq = 2 - topk = 2 - search_data = cf.gen_vectors(nq, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=topk, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": topk}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) - - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) - @pytest.mark.parametrize("entities", [2000]) - @pytest.mark.skip("import data via csv is no longer supported") - def test_binary_vector_csv(self, auto_id, dim, entities): - """ - collection: auto_id, customized_id - collection schema: [pk, int64, binary_vector] - Step: - 1. create collection - 2. create index and load collection - 3. import data - 4. verify data entities - 5. load collection - 6. verify search successfully - 7. verify query successfully - """ - files = prepare_bulk_insert_csv_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - rows=entities, - dim=dim, - auto_id=auto_id, - float_vector=False, - data_fields=default_vec_only_fields, - force=True - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_binary_vec_field(name=df.vec_field, dim=dim) - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema) - # build index before bulk insert - binary_index_params = { - "index_type": "BIN_IVF_FLAT", - "metric_type": "JACCARD", - "params": {"nlist": 64}, - } - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=binary_index_params - ) - # load collection - self.collection_wrap.load() - # import data - t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, - partition_name=None, - files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=300 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - assert success - time.sleep(2) - self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) - res, _ = self.utility_wrap.index_building_progress(c_name) - log.info(f"index building progress: {res}") - - # verify num entities - assert self.collection_wrap.num_entities == entities - # verify search and query - log.info(f"wait for load finished and be ready for search") - self.collection_wrap.load(_refresh=True) - time.sleep(2) - search_data = cf.gen_binary_vectors(1, dim)[1] - search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=1, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": 1}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) - - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("dim", [128]) - @pytest.mark.parametrize("entities", [2000]) - @pytest.mark.skip("import data via csv is no longer supported") - def test_partition_csv(self, auto_id, dim, entities): - """ - collection schema: [pk, int64, string, float_vector] - Step: - 1. create collection and partition - 2. build index and load partition - 3. import data into the partition - 4. verify num entities - 5. verify index status - 6. verify search and query - """ - data_fields = [df.int_field, df.string_field, df.vec_field] - files = prepare_bulk_insert_csv_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - rows=entities, - dim=dim, - auto_id=auto_id, - data_fields=data_fields, - force=True - ) - - self._connect() - c_name = cf.gen_unique_str("bulk_insert_partition") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_int64_field(name=df.int_field), - cf.gen_string_field(name=df.string_field), - cf.gen_float_vec_field(name=df.vec_field, dim=dim) - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema) - # create a partition - p_name = cf.gen_unique_str("bulk_insert_partition") - m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name) - # build index - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - # load before bulk insert - self.collection_wrap.load(partition_names=[p_name]) - - t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, - partition_name=p_name, - files = files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=300 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - assert success - assert m_partition.num_entities == entities - assert self.collection_wrap.num_entities == entities - log.debug(state) - time.sleep(2) - self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) - res, _ = self.utility_wrap.index_building_progress(c_name) - log.info(f"index building progress: {res}") - log.info(f"wait for load finished and be ready for search") - self.collection_wrap.load(_refresh=True) - time.sleep(2) - log.info( - f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" - ) - - nq = 10 - topk = 5 - search_data = cf.gen_vectors(nq, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=topk, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": topk}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) diff --git a/tests/scripts/ci_e2e_4am.sh b/tests/scripts/ci_e2e_4am.sh index 35b7d1e3e5..1d75b03833 100755 --- a/tests/scripts/ci_e2e_4am.sh +++ b/tests/scripts/ci_e2e_4am.sh @@ -74,10 +74,10 @@ cd ${ROOT}/tests/python_client if [[ "${MILVUS_HELM_RELEASE_NAME}" != *"msop"* ]]; then if [[ -n "${TEST_TIMEOUT:-}" ]]; then - timeout "${TEST_TIMEOUT}" pytest testcases/test_bulk_insert.py --timeout=300 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ + timeout "${TEST_TIMEOUT}" pytest testcases/test_bulk_insert.py --timeout=300 -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ --html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html else - pytest testcases/test_bulk_insert.py --timeout=300 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ + pytest testcases/test_bulk_insert.py --timeout=300 -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ --html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html fi fi @@ -133,3 +133,13 @@ else pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} \ --html=${CI_LOG_PATH}/report.html --self-contained-html ${@:-} fi + +# # Run concurrent test with 5 processes +# if [[ -n "${TEST_TIMEOUT:-}" ]]; then + +# timeout "${TEST_TIMEOUT}" pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \ +# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html +# else +# pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \ +# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html +# fi