From 8cb5122d4e2817016f143800b799b69b61baa6b9 Mon Sep 17 00:00:00 2001 From: yanliang567 <82361606+yanliang567@users.noreply.github.com> Date: Fri, 28 Oct 2022 17:21:33 +0800 Subject: [PATCH] Remove row_based parameter in bulk_insert tests (#20171) Signed-off-by: yanliang567 Signed-off-by: yanliang567 --- tests/python_client/base/utility_wrapper.py | 4 +- .../bulk_insert/bulk_insert_data.py | 17 +- .../bulk_insert/test_bulk_insert.py | 298 +++++++++--------- .../test_bulk_insert_task_clean.py | 6 +- 4 files changed, 163 insertions(+), 162 deletions(-) diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index 372e3e3757..f4af17785e 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -18,13 +18,13 @@ class ApiUtilityWrapper: ut = utility role = None - def bulk_insert(self, collection_name, is_row_based=True, files="", partition_name=None, timeout=None, + def bulk_insert(self, collection_name, files="", partition_name=None, timeout=None, using="default", check_task=None, check_items=None, **kwargs): working_tasks = self.get_bulk_insert_working_list() log.info(f"before bulk load, there are {len(working_tasks)} working tasks") log.info(f"files to load: {files}") func_name = sys._getframe().f_code.co_name - res, is_succ = api_request([self.ut.bulk_insert, collection_name, is_row_based, + res, is_succ = api_request([self.ut.bulk_insert, collection_name, files, partition_name, timeout, using], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name, using=using).run() diff --git a/tests/python_client/bulk_insert/bulk_insert_data.py b/tests/python_client/bulk_insert/bulk_insert_data.py index 8ff1682b28..f0e9cd7f22 100644 --- a/tests/python_client/bulk_insert/bulk_insert_data.py +++ b/tests/python_client/bulk_insert/bulk_insert_data.py @@ -1,3 +1,4 @@ +import copy import time import os import pathlib @@ -340,7 +341,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk, files = [] start_uid = 0 # make sure pk field exists when not auto_id - if not auto_id and DataField.pk_field not in data_fields: + if (not auto_id) and (DataField.pk_field not in data_fields): data_fields.append(DataField.pk_field) for i in range(file_nums): file_name = gen_file_name(is_row_based=is_row_based, rows=rows, dim=dim, @@ -397,10 +398,11 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="" return files -def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket", is_row_based=True, rows=100, dim=128, - auto_id=True, str_pk=False, float_vector=True, - data_fields=[], file_nums=1, multi_folder=False, - file_type=".json", err_type="", force=False, **kwargs): +def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket", + is_row_based=True, rows=100, dim=128, + auto_id=True, str_pk=False, float_vector=True, + data_fields=[], file_nums=1, multi_folder=False, + file_type=".json", err_type="", force=False, **kwargs): """ Generate files based on the params in json format and copy them to minio @@ -456,9 +458,12 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket :return list file names list """ + data_fields_c = copy.deepcopy(data_fields) + print(f"data_fields: {data_fields}") + print(f"data_fields_c: {data_fields_c}") files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim, auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, - data_fields=data_fields, file_nums=file_nums, multi_folder=multi_folder, + data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder, file_type=file_type, err_type=err_type, force=force, **kwargs) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) diff --git a/tests/python_client/bulk_insert/test_bulk_insert.py b/tests/python_client/bulk_insert/test_bulk_insert.py index 3ebca43203..df21fb1819 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert.py +++ b/tests/python_client/bulk_insert/test_bulk_insert.py @@ -73,7 +73,7 @@ class TestcaseBaseBulkInsert(TestcaseBase): class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8, 128 @pytest.mark.parametrize("entities", [100]) # 100, 1000 @@ -112,7 +112,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -156,7 +155,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 def test_str_pk_float_vector_only(self, is_row_based, dim, entities): @@ -193,7 +192,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -239,7 +238,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [3000]) @@ -291,7 +290,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=p_name, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -332,7 +330,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [16]) @pytest.mark.parametrize("entities", [2000]) @@ -381,9 +379,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files - ) + task_ids, _ = self.utility_wrap.bulk_insert(collection_name=c_name, + files=files) logging.info(f"bulk insert task ids:{task_ids}") success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( task_ids=task_ids, timeout=90 @@ -416,7 +413,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize( "fields_num_in_file", ["equal", "more", "less"] @@ -456,6 +453,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), ] if fields_num_in_file == "more": fields.pop() @@ -474,7 +472,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -482,12 +480,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") - if fields_num_in_file == "less": + if fields_num_in_file in ["less", "more"]: assert not success if is_row_based: - failed_reason = ( - f"JSON row validator: field {additional_field} missed at the row 0" - ) + if fields_num_in_file == "less": + failed_reason = f"field '{additional_field}' missed at the row 0" + else: + failed_reason = f"field '{df.float_field}' is not defined in collection schema" else: failed_reason = "is not equal to other fields" for state in states.values(): @@ -546,12 +545,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert): """ bulk_insert_row = 500 direct_insert_row = 3000 + dim = 16 files = prepare_bulk_insert_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, is_row_based=True, rows=bulk_insert_row, - dim=16, + dim=dim, data_fields=[df.pk_field, df.float_field, df.vec_field], force=True, ) @@ -560,12 +560,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): fields = [ cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_float_field(name=df.float_field), - cf.gen_float_vec_field(name=df.vec_field, dim=16), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), ] data = [ [i for i in range(direct_insert_row)], [np.float32(i) for i in range(direct_insert_row)], - cf.gen_vectors(direct_insert_row, 16), + cf.gen_vectors(direct_insert_row, dim=dim), ] schema = cf.gen_collection_schema(fields=fields) @@ -584,7 +584,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=True, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -610,7 +610,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): time.sleep(5) nq = 3 topk = 10 - search_data = cf.gen_vectors(nq, 16) + search_data = cf.gen_vectors(nq, dim=dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( search_data, @@ -651,7 +651,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): rows=500, dim=16, auto_id=True, - data_fields=[df.pk_field, df.vec_field], + data_fields=[df.vec_field], force=True, ) self._connect() @@ -673,7 +673,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=True, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -716,7 +716,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize( "fields_num_in_file", ["equal", "more", "less"] ) # "equal", "more", "less" @@ -757,6 +757,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), ] if fields_num_in_file == "more": fields.pop() @@ -774,7 +775,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -782,12 +783,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") - if fields_num_in_file == "less": + if fields_num_in_file in ["less", "more"]: assert not success # TODO: check error msg if is_row_based: - failed_reason = ( - f"JSON row validator: field {additional_field} missed at the row 0" - ) + if fields_num_in_file == "less": + failed_reason = f"field '{additional_field}' missed at the row 0" + else: + failed_reason = f"field '{df.float_field}' is not defined in collection schema" else: failed_reason = "is not equal to other fields" for state in states.values(): @@ -821,7 +823,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499")), False]) # True, False + @pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499"))]) # True, False @pytest.mark.parametrize("auto_id", [True, False]) # True, False @pytest.mark.parametrize("dim", [16]) # 16 @pytest.mark.parametrize("entities", [100]) # 3000 @@ -863,6 +865,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field) ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -876,7 +879,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -922,11 +925,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("multi_fields", [True, False]) @pytest.mark.parametrize("dim", [15]) @pytest.mark.parametrize("entities", [200]) + @pytest.mark.skip(reason="stop support for numpy files") def test_float_vector_from_numpy_file( self, is_row_based, auto_id, multi_fields, dim, entities ): @@ -1006,7 +1010,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1060,7 +1064,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) def test_data_type_float_on_int_pk(self, is_row_based, dim, entities): @@ -1094,12 +1098,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), ] schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1122,7 +1127,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert [{df.pk_field: 3}] == res @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) @@ -1163,7 +1168,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1200,6 +1205,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 + @pytest.mark.skip(reason="stop support for numpy files") def test_with_all_field_numpy(self, auto_id, dim, entities): """ collection schema 1: [pk, int64, float64, string float_vector] @@ -1233,7 +1239,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=False, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1281,8 +1287,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): 2. import data 3. verify that import numpy files in a loop """ - is_row_based = False # numpy files supports only column based - self._connect() c_name = cf.gen_unique_str("bulk_insert") fields = [ @@ -1314,7 +1318,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): force=True, ) task_id, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) task_ids.append(task_id[0]) success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1348,8 +1352,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) - def test_non_existing_file(self, is_row_based): + def test_non_existing_file(self): """ collection: either auto_id or not collection schema: not existing file(s) @@ -1373,7 +1376,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1381,13 +1383,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids=task_ids, timeout=90 ) assert not success - failed_reason = f"failed to get file size of {files[0]}" + failed_reason = f"failed to get file size of '{files[0]}'" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) def test_empty_json_file(self, is_row_based, auto_id): """ @@ -1423,7 +1425,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1431,13 +1432,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids=task_ids, timeout=90 ) assert not success - failed_reason = "JSON parse: row count is 0" + failed_reason = "JSON parser: row count is 0" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 @@ -1486,7 +1487,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1502,7 +1502,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [100]) @@ -1539,7 +1539,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1549,7 +1548,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert not success if is_row_based: value = df.vec_field # if auto_id else df.pk_field - failed_reason = f"JSON parse: invalid row-based JSON format, the key {value} is not found" + failed_reason = f"JSON parser: invalid row-based JSON format, the key {value} is not found" else: failed_reason = "JSON parse: row count is 0" for state in states.values(): @@ -1557,7 +1556,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 @@ -1594,7 +1593,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1608,7 +1606,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): else: assert not success if is_row_based: - failed_reason = f"field {dismatch_pk_field} missed at the row 0" + failed_reason = f"the field '{df.pk_field}' is not defined in collection schema" else: failed_reason = f"field {dismatch_pk_field} row count 0 is not equal to other fields row count" for state in states.values(): @@ -1616,7 +1614,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 @@ -1652,7 +1650,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1664,7 +1661,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert not success if is_row_based: - failed_reason = f"field {dismatch_vec_field} missed at the row 0" + failed_reason = f"the field '{df.vec_field}' is not defined in collection schema" else: if auto_id: failed_reason = f"JSON column consumer: row count is 0" @@ -1675,7 +1672,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [200]) @@ -1712,7 +1709,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name="", - is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -1723,7 +1719,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success} in {tt}") assert not success if is_row_based: - failed_reason = f"field {dismatch_scalar_field} missed at the row 0" + failed_reason = f"field '{df.int_field}' is not defined in collection schema" else: failed_reason = f"field {dismatch_scalar_field} row count 0 is not equal to other fields row count" for state in states.values(): @@ -1731,7 +1727,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [200]) @@ -1763,7 +1759,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1771,13 +1767,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) log.info(f"bulk insert state:{success}") assert not success - failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors at the row " + failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [200]) def test_non_existing_collection(self, is_row_based, dim, entities): @@ -1801,14 +1797,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): err_msg = f"can't find collection: {c_name}" task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, - is_row_based=is_row_based, files=files, check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg}, ) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [200]) @pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19553") @@ -1842,14 +1837,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=p_name, - is_row_based=is_row_based, files=files, check_task=CheckTasks.err_res, check_items={"err_code": 11, "err_msg": err_msg}, ) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [1000]) @@ -1886,7 +1880,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -1895,7 +1889,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success}") assert not success failed_reason = ( - f"doesn't equal to vector dimension {dim} of field vectors at the row" + f"doesn't equal to vector dimension {dim} of field vectors" ) for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] @@ -1903,7 +1897,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert self.collection_wrap.num_entities == 0 @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [16]) @pytest.mark.parametrize("entities", [300]) @@ -1952,6 +1946,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) @@ -1959,7 +1954,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2011,7 +2006,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=False, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2021,7 +2016,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success} in {tt}") assert not success - failed_reason = f"illegal row width {dim}" + failed_reason = f"illegal dimension {dim} of numpy file" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -2031,7 +2026,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("auto_id", [False]) @pytest.mark.parametrize("dim", [15]) @pytest.mark.parametrize("entities", [100]) - @pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/18992") def test_wrong_field_name_in_numpy(self, auto_id, dim, entities): """ collection schema 1: [pk, float_vector] @@ -2068,7 +2062,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=False, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2078,7 +2072,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success} in {tt}") assert not success - failed_reason = f"Numpy parse: the field {df.vec_field} doesn't exist" + failed_reason = f"file '{df.vec_field}.npy' has no corresponding field in collection" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -2119,7 +2113,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=False, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2135,7 +2129,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert self.collection_wrap.num_entities == 0 @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) def test_data_type_string_on_int_pk(self, is_row_based, dim, entities): @@ -2168,12 +2162,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): cf.gen_int32_field(name=df.int_field), cf.gen_string_field(name=df.string_field), cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), ] schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2188,7 +2183,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert self.collection_wrap.num_entities == 0 @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) @@ -2211,7 +2206,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): auto_id=False, data_fields=default_multi_fields, err_type=DataErrorType.typo_on_bool, - scalars=default_multi_fields, + force=True, ) self._connect() c_name = cf.gen_unique_str("bulk_insert") @@ -2228,7 +2223,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2271,7 +2266,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): 2. import data 3. fail to import data with errors """ - is_row_based = False # numpy files supports only column based data_fields = [df.vec_field] if not auto_id: data_fields.append(df.pk_field) @@ -2295,7 +2289,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): t0 = time.time() task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files ) success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( task_ids=task_ids, timeout=90 @@ -2311,7 +2305,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert self.collection_wrap.num_entities == 0 @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [9]) @pytest.mark.parametrize("entities", [10]) @@ -2334,66 +2328,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): auto_id=auto_id, data_fields=default_multi_fields, err_type=DataErrorType.str_on_float_scalar, - scalars=default_multi_fields, - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - cf.gen_int32_field(name=df.int_field), - cf.gen_float_field(name=df.float_field), - cf.gen_string_field(name=df.string_field), - cf.gen_bool_field(name=df.bool_field), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema) - # import data - task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files - ) - logging.info(f"bulk insert task ids:{task_ids}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 - ) - log.info(f"bulk insert state:{success}") - assert not success - failed_reason = "illegal numeric value" - for state in states.values(): - assert state.state_name in ["Failed", "Failed and cleaned"] - assert failed_reason in state.infos.get("failed_reason", "") - assert self.collection_wrap.num_entities == 0 - - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) - @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("float_vector", [True, False]) - @pytest.mark.parametrize("dim", [8]) - @pytest.mark.parametrize("entities", [500]) - def test_data_type_str_on_vector_fields( - self, is_row_based, auto_id, float_vector, dim, entities - ): - """ - collection schema: [pk, float_vector, - float_scalar, int_scalar, string_scalar, bool_scalar] - data files: json file that entities has string data on vectors - Steps: - 1. create collection - 2. import data - 3. verify import failed with errors - """ - files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=is_row_based, - rows=entities, - dim=dim, - auto_id=auto_id, - float_vector=float_vector, - data_fields=default_multi_fields, - err_type=DataErrorType.str_on_vector_field, - wrong_position=entities // 2, - scalars=default_multi_fields, force=True, ) self._connect() @@ -2410,7 +2344,69 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=is_row_based, files=files + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_ids}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=task_ids, timeout=90 + ) + log.info(f"bulk insert state:{success}") + assert not success + failed_reason = "illegal numeric value" + for state in states.values(): + assert state.state_name in ["Failed", "Failed and cleaned"] + assert failed_reason in state.infos.get("failed_reason", "") + assert self.collection_wrap.num_entities == 0 + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("is_row_based", [True]) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("float_vector", [True, False]) + @pytest.mark.parametrize("dim", [8]) + @pytest.mark.parametrize("entities", [500]) + def test_data_type_str_on_vector_fields( + self, is_row_based, auto_id, float_vector, dim, entities + ): + """ + collection schema: [pk, float_vector, + float_scalar, int_scalar, string_scalar, bool_scalar] + data files: json file that entities has string data on vectors + Steps: + 1. create collection + 2. import data + 3. verify import failed with errors + """ + wrong_position = entities // 2 + files = prepare_bulk_insert_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + is_row_based=is_row_based, + rows=entities, + dim=dim, + auto_id=auto_id, + float_vector=float_vector, + data_fields=default_multi_fields, + err_type=DataErrorType.str_on_vector_field, + wrong_position=wrong_position, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [cf.gen_float_vec_field(name=df.vec_field, dim=dim)] + if not float_vector: + fields = [cf.gen_binary_vec_field(name=df.vec_field, dim=dim)] + fields.extend([ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_int32_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field), + ]) + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + # import data + task_ids, _ = self.utility_wrap.bulk_insert( + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( @@ -2420,7 +2416,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert not success failed_reason = "illegal numeric value" if not float_vector: - failed_reason = f"doesn't equal to vector dimension {dim} of field vectors" + failed_reason = f"the field '{df.vec_field}' value at the row {wrong_position} is invalid" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -2486,7 +2482,7 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert): break task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, is_row_based=False, files=files + collection_name=c_name, files=files ) logging.info(f"bulk insert task ids:{task_ids}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( diff --git a/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py b/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py index 0f55a5ce78..006c3b8730 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py @@ -73,7 +73,7 @@ class TestcaseBaseBulkInsert(TestcaseBase): class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8, 128 @pytest.mark.parametrize("entities", [100]) # 100, 1000 @@ -114,7 +114,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): task_ids, _ = self.utility_wrap.bulk_insert( collection_name=c_name, partition_name=None, - is_row_based=is_row_based, + # is_row_based=is_row_based, files=files, ) logging.info(f"bulk insert task ids:{task_ids}") @@ -175,7 +175,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True, False]) + @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8, 128 @pytest.mark.parametrize("entities", [100]) # 100, 1000