diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index a61d08b358..e39a3cc0aa 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -10,7 +10,6 @@ from base.client_base import TestcaseBase from common import common_func as cf from common import common_type as ct from common.common_params import DefaultVectorIndexParams, DefaultVectorSearchParams -from common.milvus_sys import MilvusSys from common.common_type import CaseLabel, CheckTasks from utils.util_log import test_log as log from common.bulk_insert_data import ( @@ -60,7 +59,7 @@ class TestcaseBaseBulkInsert(TestcaseBase): class TestBulkInsert(TestcaseBaseBulkInsert): - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 8, 128 @@ -149,7 +148,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [128]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 @@ -239,7 +238,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=expr) assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) @@ -334,7 +333,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) @@ -419,7 +418,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("insert_before_bulk_insert", [True, False]) def test_insert_before_or_after_bulk_insert(self, insert_before_bulk_insert): """ @@ -520,7 +519,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=expr) assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("create_index_before_bulk_insert", [True, False]) @pytest.mark.parametrize("loaded_before_bulk_insert", [True, False]) def test_load_before_or_after_bulk_insert(self, loaded_before_bulk_insert, create_index_before_bulk_insert): @@ -609,7 +608,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=expr) assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) def test_index_load_before_bulk_insert(self): """ Steps: @@ -755,7 +754,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): res, _ = self.collection_wrap.query(expr=f"{df.json_field}['number'] == 1", output_fields=[df.json_field]) assert len(res) == 1 - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [2000]) @@ -942,7 +941,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): else: assert len(res) == 0 - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [2000]) @@ -1112,7 +1111,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): res, _ = self.collection_wrap.query(expr=f"{df.json_field}['number'] == 1", output_fields=[df.json_field]) assert len(res) == 1 - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [2000]) @@ -1302,7 +1301,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): else: assert len(res) == 0 - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [2000]) @@ -1425,7 +1424,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert "address" in fields_from_search - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [2000]) @@ -1546,7 +1545,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert "name" in fields_from_search assert "address" in fields_from_search - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 @@ -1698,7 +1697,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): else: assert 0 < len(res) < int(entities/len(json_value)) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) @pytest.mark.parametrize("entities", [1000]) @@ -1865,7 +1864,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 @@ -2000,7 +1999,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): res, _ = self.collection_wrap.query(expr=f"{df.json_field}['number'] == 1", output_fields=[df.json_field]) assert len(res) == int(entities / len(json_value)) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 @@ -2146,7 +2145,151 @@ class TestBulkInsert(TestcaseBaseBulkInsert): else: assert 0 < len(res) < int(entities/len(json_value)) - @pytest.mark.tags(CaseLabel.L3) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + @pytest.mark.parametrize("sparse_format", ["doc", "coo"]) + @pytest.mark.parametrize("nullable", [True, False]) + def test_with_all_field_csv_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format, nullable): + """ + """ + self._connect() + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field, nullable=nullable), + cf.gen_float_field(name=df.float_field, nullable=nullable), + cf.gen_string_field(name=df.string_field, nullable=nullable), + cf.gen_json_field(name=df.json_field, nullable=nullable), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64, nullable=nullable), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim), + cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim), + cf.gen_sparse_vec_field(name=df.sparse_vec_field), + ] + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + with RemoteBulkWriter( + schema=schema, + remote_path="bulk_data", + connect_param=RemoteBulkWriter.ConnectParam( + bucket_name=self.bucket_name, + endpoint=self.minio_endpoint, + access_key="minioadmin", + secret_key="minioadmin", + ), + file_type=BulkFileType.CSV, + ) as remote_writer: + json_value = [ + {"key": "value"}, + {"number": 1}, + {"name": fake.name()}, + {"address": fake.address()} + ] + for i in range(entities): + row = { + df.pk_field: i, + df.int_field: 1 if not (nullable and random.random() < 0.5) else None, + df.float_field: 1.0 if not (nullable and random.random() < 0.5) else None, + df.string_field: "string" if not (nullable and random.random() < 0.5) else None, + df.json_field: json_value[i%len(json_value)] if not (nullable and random.random() < 0.5) else None, + df.array_int_field: [1, 2] if not (nullable and random.random() < 0.5) else None, + df.array_float_field: [1.0, 2.0] if not (nullable and random.random() < 0.5) else None, + df.array_string_field: ["string1", "string2"] if not (nullable and random.random() < 0.5) else None, + df.array_bool_field: [True, False] if not (nullable and random.random() < 0.5) else None, + df.float_vec_field: cf.gen_vectors(1, dim)[0], + df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type=DataType.FLOAT16_VECTOR)[0], + df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type=DataType.BFLOAT16_VECTOR)[0], + df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0] + } + if auto_id: + row.pop(df.pk_field) + if enable_dynamic_field: + row["name"] = fake.name() + row["address"] = fake.address() + remote_writer.append_row(row) + remote_writer.commit() + files = remote_writer.batch_files + # import data + for f in files: + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=f + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + # verify imported data is available for search + index_params = ct.default_index + float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name] + sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name] + for f in float_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=index_params + ) + for f in sparse_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=ct.default_sparse_inverted_index + ) + # add json path index for json field + json_path_index_params_double = {"index_type": "INVERTED", "params": {"json_cast_type": "double", + "json_path": f"{df.json_field}['number']"}} + self.collection_wrap.create_index(field_name=df.json_field, index_params=json_path_index_params_double) + json_path_index_params_varchar = {"index_type": "INVERTED", "params": {"json_cast_type": "VARCHAR", + "json_path": f"{df.json_field}['address']"}} + self.collection_wrap.create_index(field_name=df.json_field, index_params=json_path_index_params_varchar) + json_path_index_params_bool = {"index_type": "INVERTED", "params": {"json_cast_type": "Bool", + "json_path": f"{df.json_field}['name']"}} + self.collection_wrap.create_index(field_name=df.json_field, index_params=json_path_index_params_bool) + json_path_index_params_not_exist = {"index_type": "INVERTED", "params": {"json_cast_type": "Double", + "json_path": f"{df.json_field}['not_exist']"}} + self.collection_wrap.create_index(field_name=df.json_field, index_params=json_path_index_params_not_exist) + self.collection_wrap.load() + log.info(f"wait for load finished and be ready for search") + time.sleep(2) + # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + search_data = cf.gen_vectors(1, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.float_vec_field, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + res, _ = self.collection_wrap.query(expr=f"{df.json_field}['number'] == 1", output_fields=[df.json_field]) + if not nullable: + assert len(res) == int(entities/len(json_value)) + else: + assert 0 < len(res) < int(entities/len(json_value)) + + + + + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 @@ -2199,7 +2342,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_task=CheckTasks.err_res, check_items=error ) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) @pytest.mark.parametrize("entities", [2000]) @@ -2273,7 +2416,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_items={"nq": 1, "limit": 1}, ) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("par_key_field", [df.int_field, df.string_field]) @@ -2384,7 +2527,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_items={"err_code": 2100, "err_msg": err_msg}, ) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [13]) @pytest.mark.parametrize("entities", [150]) @@ -2470,7 +2613,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert num_entities == entities * file_nums @pytest.mark.parametrize("pk_field", [df.pk_field, df.string_field]) - @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.tags(CaseLabel.L2) def test_bulk_import_random_pk_stats_task(self, pk_field): # connect -> prepare json data self._connect() @@ -2551,6 +2694,7 @@ class TestImportWithTextEmbedding(TestcaseBase): """ @pytest.mark.parametrize("file_format", ["json", "parquet", "numpy"]) + @pytest.mark.tags(CaseLabel.L1) def test_import_without_embedding(self, tei_endpoint, minio_host, file_format): """ target: test import data without embedding diff --git a/tests/scripts/ci_e2e_4am.sh b/tests/scripts/ci_e2e_4am.sh index 10c8f27dd1..0f60342e3e 100755 --- a/tests/scripts/ci_e2e_4am.sh +++ b/tests/scripts/ci_e2e_4am.sh @@ -65,19 +65,6 @@ if [ "${DISABLE_PIP_INSTALL:-}" = "false" ]; then install_pytest_requirements fi -cd ${ROOT}/tests/python_client -# Run bulk insert test -# if MILVUS_HELM_RELEASE_NAME contains "msop", then it is one pod mode, skip the bulk insert test -if [[ "${MILVUS_HELM_RELEASE_NAME}" != *"msop"* ]]; then - if [[ -n "${TEST_TIMEOUT:-}" ]]; then - - timeout "${TEST_TIMEOUT}" pytest testcases/test_bulk_insert.py --timeout=300 -v -x -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ - --html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html - else - pytest testcases/test_bulk_insert.py --timeout=300 -v -x -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \ - --html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html - fi -fi # Run restful test v1 @@ -114,19 +101,10 @@ cd ${ROOT}/tests/python_client # Pytest is not able to have both --timeout & --workers, so do not add --timeout or --workers in the shell script if [[ -n "${TEST_TIMEOUT:-}" ]]; then - timeout "${TEST_TIMEOUT}" pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} \ + timeout "${TEST_TIMEOUT}" pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME}\ --html=${CI_LOG_PATH}/report.html --self-contained-html --dist loadgroup ${@:-} else - pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} \ + pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME}\ --html=${CI_LOG_PATH}/report.html --self-contained-html --dist loadgroup ${@:-} fi -# # Run concurrent test with 5 processes -# if [[ -n "${TEST_TIMEOUT:-}" ]]; then - -# timeout "${TEST_TIMEOUT}" pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \ -# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html -# else -# pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \ -# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html -# fi