diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index e0c9665c1c..0c585192e6 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -92,7 +92,7 @@ class ApiUtilityWrapper: unknown, "unknown") def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted, - timeout=None, using="default", **kwargs): + timeout=None, using="default", **kwargs): start = time.time() tasks_state_distribution = { "success": set(), @@ -182,6 +182,24 @@ class ApiUtilityWrapper: if task.task_id in pending_task_ids: log.info(f"task {task.task_id} state transfer from pending to {task.state_name}") + def wait_index_build_completed(self, collection_name, timeout=None): + start = time.time() + if timeout is not None: + task_timeout = timeout + else: + task_timeout = TIMEOUT + end = time.time() + while end-start <= task_timeout: + time.sleep(0.5) + index_states, _ = self.index_building_progress(collection_name) + log.debug(f"index states: {index_states}") + if index_states["total_rows"] == index_states["indexed_rows"]: + log.info(f"index build completed") + return True + end = time.time() + log.info(f"index build timeout") + return False + def get_query_segment_info(self, collection_name, timeout=None, using="default", check_task=None, check_items=None): timeout = TIMEOUT if timeout is None else timeout func_name = sys._getframe().f_code.co_name diff --git a/tests/python_client/bulk_insert/bulk_insert_data.py b/tests/python_client/bulk_insert/bulk_insert_data.py index f0e9cd7f22..51d4e418cf 100644 --- a/tests/python_client/bulk_insert/bulk_insert_data.py +++ b/tests/python_client/bulk_insert/bulk_insert_data.py @@ -5,7 +5,7 @@ import pathlib import numpy as np import random from sklearn import preprocessing -from common.common_func import gen_unique_str +from common.common_func import gen_unique_unicode_str from minio_comm import copy_files_to_minio from utils.util_log import test_log as log @@ -65,7 +65,7 @@ def gen_float_vectors(nb, dim): def gen_str_invalid_vectors(nb, dim): - vectors = [[str(gen_unique_str()) for _ in range(dim)] for _ in range(nb)] + vectors = [[str(gen_unique_unicode_str()) for _ in range(dim)] for _ in range(nb)] return vectors @@ -101,7 +101,7 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, data_field = data_fields[j] if data_field == DataField.pk_field: if str_pk: - f.write('"uid":"' + str(gen_unique_str()) + '"') + f.write('"uid":"' + str(gen_unique_unicode_str()) + '"') else: if err_type == DataErrorType.float_on_int_pk: f.write('"uid":' + str(i + start_uid + random.random()) + '') @@ -117,11 +117,11 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, if err_type == DataErrorType.int_on_float_scalar: f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '') elif err_type == DataErrorType.str_on_float_scalar: - f.write('"float_scalar":"' + str(gen_unique_str()) + '"') + f.write('"float_scalar":"' + str(gen_unique_unicode_str()) + '"') else: f.write('"float_scalar":' + str(random.random()) + '') if data_field == DataField.string_field: - f.write('"string_scalar":"' + str(gen_unique_str()) + '"') + f.write('"string_scalar":"' + str(gen_unique_unicode_str()) + '"') if data_field == DataField.bool_field: if err_type == DataErrorType.typo_on_bool: f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '') @@ -161,7 +161,7 @@ def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect, data_field = data_fields[j] if data_field == DataField.pk_field: if str_pk: - f.write('"uid":["' + ',"'.join(str(gen_unique_str()) + '"' for i in range(rows)) + ']') + f.write('"uid":["' + ',"'.join(str(gen_unique_unicode_str()) + '"' for i in range(rows)) + ']') f.write("\n") else: if err_type == DataErrorType.float_on_int_pk: @@ -184,14 +184,14 @@ def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect, str(random.randint(-999999, 9999999)) for i in range(rows)) + "]") elif err_type == DataErrorType.str_on_float_scalar: f.write('"float_scalar":["' + ',"'.join(str( - gen_unique_str()) + '"' for i in range(rows)) + ']') + gen_unique_unicode_str()) + '"' for i in range(rows)) + ']') else: f.write('"float_scalar":[' + ",".join( str(random.random()) for i in range(rows)) + "]") f.write("\n") if data_field == DataField.string_field: f.write('"string_scalar":["' + ',"'.join(str( - gen_unique_str()) + '"' for i in range(rows)) + ']') + gen_unique_unicode_str()) + '"' for i in range(rows)) + ']') f.write("\n") if data_field == DataField.bool_field: if err_type == DataErrorType.typo_on_bool: @@ -270,7 +270,22 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False): # non vector columns data = [] if rows > 0: - data = [gen_unique_str(str(i)) for i in range(start, rows+start)] + data = [gen_unique_unicode_str(str(i)) for i in range(start, rows+start)] + arr = np.array(data) + # print(f"file_name: {file_name} data type: {arr.dtype}") + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + np.save(file, arr) + return file_name + + +def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False): + file_name = f"{data_field}.npy" + file = f"{dir}/{file_name}" + if not os.path.exists(file) or force: + # non vector columns + data = [] + if rows > 0: + data = [random.choice([True, False]) for i in range(start, rows+start)] arr = np.array(data) # print(f"file_name: {file_name} data type: {arr.dtype}") log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") @@ -380,6 +395,8 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="" rows=rows, dim=dim, force=force) elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) + elif data_field == DataField.bool_field: + file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) else: file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) diff --git a/tests/python_client/bulk_insert/test_bulk_insert.py b/tests/python_client/bulk_insert/test_bulk_insert.py index 681a215e2f..ff704039ad 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert.py +++ b/tests/python_client/bulk_insert/test_bulk_insert.py @@ -114,6 +114,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) + # import data t0 = time.time() task_id, _ = self.utility_wrap.do_bulk_insert( @@ -138,9 +139,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.create_index( field_name=df.vec_field, index_params=index_params ) - self.collection_wrap.load() + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load() + self.collection_wrap.load(_refresh=True) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -187,6 +190,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): auto_id=auto_id, str_pk=string_pk, data_fields=default_vec_only_fields, + force=True ) self._connect() c_name = cf.gen_unique_str("bulk_insert") @@ -218,9 +222,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.create_index( field_name=df.vec_field, index_params=index_params ) - self.collection_wrap.load() + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load() + self.collection_wrap.load(_reshard=True) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -310,12 +316,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert m_partition.num_entities == entities assert self.collection_wrap.num_entities == entities log.debug(state) - time.sleep(20) - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {"total_rows": entities, "indexed_rows": entities} - assert res == exp_res + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(partition_names=[p_name], _refresh=True) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -396,15 +400,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert): tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") assert success - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': entities, 'indexed_rows': entities} - assert res == exp_res - + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify num entities assert self.collection_wrap.num_entities == entities # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) search_data = cf.gen_binary_vectors(1, dim)[1] search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} res, _ = self.collection_wrap.search( @@ -510,12 +512,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # verify index status res, _ = self.collection_wrap.has_index() assert res is True - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': entities, 'indexed_rows': entities} - assert res == exp_res + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, dim) @@ -613,13 +614,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): log.info(f"collection entities: {num_entities}") assert num_entities == bulk_insert_row + direct_insert_row # verify index status - time.sleep(20) - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities} - assert res == exp_res + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, dim=dim) @@ -675,13 +674,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=True) self.collection_wrap.init_collection(c_name, schema=schema) # build index - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) + if create_index_before_bulk_insert: + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) if loaded_before_bulk_insert: # load collection self.collection_wrap.load() + # import data t0 = time.time() task_id, _ = self.utility_wrap.do_bulk_insert( @@ -694,6 +695,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") assert success + if not create_index_before_bulk_insert: + # build index + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) if not loaded_before_bulk_insert: # load collection self.collection_wrap.load() @@ -702,12 +709,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): log.info(f"collection entities: {num_entities}") assert num_entities == 500 # verify no index - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities} - assert res == exp_res + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, 16) @@ -799,7 +805,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert not success # TODO: check error msg if is_row_based: if fields_num_in_file == "less": - failed_reason = f"field '{additional_field}' missed at the row 0" + failed_reason = f"value of field '{additional_field}' is missed" else: failed_reason = f"field '{df.float_field}' is not defined in collection schema" else: @@ -811,12 +817,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert success log.info(f" collection entities: {self.collection_wrap.num_entities}") assert self.collection_wrap.num_entities == entities - # verify no index - res, _ = self.collection_wrap.has_index() - assert res is True + # verify index + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -836,14 +842,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [False]) @pytest.mark.parametrize("auto_id", [True, False]) - @pytest.mark.parametrize("multi_fields", [True, False]) @pytest.mark.parametrize("dim", [15]) @pytest.mark.parametrize("entities", [200]) - # @pytest.mark.skip(reason="stop support for numpy files") def test_float_vector_from_numpy_file( - self, is_row_based, auto_id, multi_fields, dim, entities + self, auto_id, dim, entities ): """ collection schema 1: [pk, float_vector] @@ -857,7 +860,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): 4.1 verify the data entities equal the import data 4.2 verify search and query successfully """ - data_fields = [df.vec_field] + if auto_id: + data_fields = [df.vec_field, df.int_field, df.string_field, df.float_field, df.bool_field] + else: + data_fields = [df.pk_field, df.vec_field, df.int_field, df.string_field, df.float_field, df.bool_field] np_files = prepare_bulk_insert_numpy_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -866,53 +872,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert): data_fields=data_fields, force=True, ) - if not multi_fields: - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - ] - if not auto_id: - scalar_fields = [df.pk_field] - else: - scalar_fields = None - else: - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - cf.gen_int32_field(name=df.int_field), - cf.gen_string_field(name=df.string_field), - cf.gen_bool_field(name=df.bool_field), - ] - if not auto_id: - scalar_fields = [ - df.pk_field, - df.float_field, - df.int_field, - df.string_field, - df.bool_field, - ] - else: - scalar_fields = [ - df.int_field, - df.string_field, - df.bool_field, - df.float_field, - ] - + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int64_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_float_field(name=df.float_field), + cf.gen_bool_field(name=df.bool_field), + ] files = np_files - if scalar_fields is not None: - json_files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=is_row_based, - dim=dim, - auto_id=auto_id, - rows=entities, - data_fields=scalar_fields, - force=True, - ) - files = np_files + json_files - self._connect() c_name = cf.gen_unique_str("bulk_insert") schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) @@ -930,112 +898,39 @@ class TestBulkInsert(TestcaseBaseBulkInsert): tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") - if is_row_based: - assert not success - failed_reason1 = "unsupported file type for row-based mode" - failed_reason2 = ( - f"JSON row validator: field {df.vec_field} missed at the row 0" - ) - for state in states.values(): - assert state.state_name in ["Failed", "Failed and cleaned"] - assert failed_reason1 in state.infos.get( - "failed_reason", "" - ) or failed_reason2 in state.infos.get("failed_reason", "") - else: - assert success - log.info(f" collection entities: {self.collection_wrap.num_entities}") - assert self.collection_wrap.num_entities == entities - # create index and load - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - self.collection_wrap.load() - log.info(f"wait for load finished and be ready for search") - time.sleep(20) - log.info( - f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" - ) - # verify imported data is available for search - nq = 2 - topk = 5 - search_data = cf.gen_vectors(nq, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=topk, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": topk}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) - - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True]) - @pytest.mark.parametrize("dim", [8]) - @pytest.mark.parametrize("entities", [10]) - def test_data_type_float_on_int_pk(self, is_row_based, dim, entities): - """ - collection schema: [pk, float_vector, - float_scalar, int_scalar, string_scalar, bool_scalar] - data files: json file that one of entities has float on int pk - Steps: - 1. create collection - 2. import data - 3. verify the data entities - 4. verify query successfully - """ - files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=is_row_based, - rows=entities, - dim=dim, - auto_id=False, - data_fields=default_multi_fields, - err_type=DataErrorType.float_on_int_pk, - force=True, - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - # TODO: add string pk - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - cf.gen_int32_field(name=df.int_field), - cf.gen_string_field(name=df.string_field), - cf.gen_bool_field(name=df.bool_field), - cf.gen_float_field(name=df.float_field), - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=False) - self.collection_wrap.init_collection(c_name, schema=schema) - # import data - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=90 - ) - log.info(f"bulk insert state:{success}") assert success + log.info(f" collection entities: {self.collection_wrap.num_entities}") assert self.collection_wrap.num_entities == entities + # create index and load index_params = ct.default_index self.collection_wrap.create_index( field_name=df.vec_field, index_params=index_params ) + result = self.utility_wrap.wait_index_build_completed(c_name) + assert result is True self.collection_wrap.load() + self.collection_wrap.load(_refresh=True) log.info(f"wait for load finished and be ready for search") - time.sleep(20) - # the pk value was automatically convert to int from float - res, _ = self.collection_wrap.query( - expr=f"{df.pk_field} in [3]", output_fields=[df.pk_field] + log.info( + f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) - assert [{df.pk_field: 3}] == res + # verify imported data is available for search + nq = 2 + topk = 5 + search_data = cf.gen_vectors(nq, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=topk, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": topk}, + ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("is_row_based", [True]) @@ -1093,9 +988,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.create_index( field_name=df.vec_field, index_params=index_params ) - self.collection_wrap.load() + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success + # verify imported data is available for search log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load() + self.collection_wrap.load(_refresh=True) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -1110,13 +1008,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): res, _ = self.collection_wrap.query( expr=f"{df.pk_field} in {uids}", output_fields=[df.float_field] ) - assert isinstance(res[0].get(df.float_field, 1), float) + assert isinstance(res[0].get(df.float_field, 1), np.float32) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("entities", [1000]) # 1000 - @pytest.mark.skip(reason="stop support for numpy files") def test_with_all_field_numpy(self, auto_id, dim, entities): """ collection schema 1: [pk, int64, float64, string float_vector] @@ -1126,7 +1023,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): 2. import data 3. verify """ - data_fields = [df.pk_field, df.int_field, df.float_field, df.double_field, df.vec_field] + data_fields = [df.int_field, df.float_field, df.double_field, df.vec_field] fields = [ cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_int64_field(name=df.int_field), @@ -1167,10 +1064,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.create_index( field_name=df.vec_field, index_params=index_params ) - self.collection_wrap.load() + success = self.utility_wrap.wait_index_build_completed(c_name) log.info(f"wait for load finished and be ready for search") - time.sleep(20) - # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") + self.collection_wrap.load() + self.collection_wrap.load(_refresh=True) + log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -1181,6 +1079,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_task=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 1}, ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @@ -1240,10 +1142,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert success log.info(f" collection entities: {self.collection_wrap.num_entities}") assert self.collection_wrap.num_entities == entities * file_nums - + # verify imported data is indexed + success = self.utility_wrap.wait_index_build_completed(c_name) + assert success # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(20) + self.collection_wrap.load(_refresh=True) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -1255,10 +1159,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_items={"nq": 1, "limit": 1}, ) - # TODO: not supported yet - def test_from_customize_bucket(self): - pass - class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @@ -1524,7 +1424,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert not success if is_row_based: value = df.vec_field # if auto_id else df.pk_field - failed_reason = f"invalid JSON format, the root key should be 'rows', but get {value}" + failed_reason = f"invalid JSON format, the root key should be 'rows', but get '{value}'" else: failed_reason = "JSON parse: row count is 0" for state in states.values(): @@ -1878,17 +1778,16 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("dim", [16]) @pytest.mark.parametrize("entities", [300]) @pytest.mark.parametrize("file_nums", [10]) # max task nums 32? need improve - @pytest.mark.skip(reason="not support multiple files now") - def test_float_vector_one_of_files_fail( + def test_float_vector_with_multi_json_files( self, is_row_based, auto_id, dim, entities, file_nums ): """ collection schema: [pk, float_vectors, int_scalar], one of entities has wrong dim data - data files: multi files, and there are errors in one of files - 1. import data 11 files(10 correct and 1 with errors) into the collection + data files: multi files, + 1. import data 10 files 2. verify that import fails with errors and no data imported """ - correct_files = prepare_bulk_insert_json_files( + multi_files = prepare_bulk_insert_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, is_row_based=is_row_based, @@ -1899,20 +1798,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): file_nums=file_nums, force=True, ) - - # append a file that has errors - dismatch_dim = dim + 1 - err_files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=is_row_based, - rows=entities, - dim=dismatch_dim, - auto_id=auto_id, - data_fields=default_multi_fields, - file_nums=1, - ) - files = correct_files + err_files + files = multi_files random.shuffle(files) # mix up the file order self._connect() @@ -1929,22 +1815,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data - t0 = time.time() task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files + collection_name=c_name, files=files, + check_task=CheckTasks.err_res, + check_items={"err_code": 1, + "err_msg": "row-based import, only allow one JSON file each time"} ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=90 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - assert not success - if is_row_based: - # all correct files shall be imported successfully - assert self.collection_wrap.num_entities == entities * file_nums - else: - assert self.collection_wrap.num_entities == 0 + assert self.collection_wrap.num_entities == 0 + @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @@ -2109,6 +1987,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [8]) @pytest.mark.parametrize("entities", [10]) + # @pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/21818") def test_data_type_string_on_int_pk(self, is_row_based, dim, entities): """ collection schema: default multi scalars @@ -2153,7 +2032,61 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) log.info(f"bulk insert state:{success}") assert not success - failed_reason = f"illegal numeric value" + failed_reason = f"illegal value" + for state in states.values(): + assert state.state_name in ["Failed", "Failed and cleaned"] + assert failed_reason in state.infos.get("failed_reason", "") + assert self.collection_wrap.num_entities == 0 + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("is_row_based", [True]) + @pytest.mark.parametrize("dim", [8]) + @pytest.mark.parametrize("entities", [10]) + def test_data_type_float_on_int_pk(self, is_row_based, dim, entities): + """ + collection schema: [pk, float_vector, + float_scalar, int_scalar, string_scalar, bool_scalar] + data files: json file that one of entities has float on int pk + Steps: + 1. create collection + 2. import data with wrong data type + 3. verify import failed + """ + files = prepare_bulk_insert_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + is_row_based=is_row_based, + rows=entities, + dim=dim, + auto_id=False, + data_fields=default_multi_fields, + err_type=DataErrorType.float_on_int_pk, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + # TODO: add string pk + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=False) + self.collection_wrap.init_collection(c_name, schema=schema) + # import data + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=90 + ) + log.info(f"bulk insert state:{success}") + assert not success + failed_reason = f"failed to convert row value to entity" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -2174,6 +2107,16 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): 2. import data 3. verify import failed with errors """ + + multi_fields = [ + df.vec_field, + df.int_field, + df.string_field, + df.bool_field, + df.float_field, + ] + if not auto_id: + multi_fields.insert(0, df.pk_field) files = prepare_bulk_insert_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -2187,7 +2130,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) self._connect() c_name = cf.gen_unique_str("bulk_insert") - # TODO: add string pk fields = [ cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_float_vec_field(name=df.vec_field, dim=dim), @@ -2330,7 +2272,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) log.info(f"bulk insert state:{success}") assert not success - failed_reason = "illegal numeric value" + failed_reason = "failed to convert row value to entity" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -2392,12 +2334,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) log.info(f"bulk insert state:{success}") assert not success - failed_reason = "illegal numeric value" - if not float_vector: - failed_reason = f"the field '{df.vec_field}' value at the row {wrong_position} is invalid" + failed_reason1 = "failed to parse row value" + failed_reason2 = "failed to convert row value to entity" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] - assert failed_reason in state.infos.get("failed_reason", "") + assert failed_reason1 in state.infos.get("failed_reason", "") or \ + failed_reason2 in state.infos.get("failed_reason", "") assert self.collection_wrap.num_entities == 0 diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 70fa79191b..2f35ca635f 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -43,6 +43,18 @@ def gen_unique_str(str_value=None): return "test_" + prefix if str_value is None else str_value + "_" + prefix +def gen_unique_unicode_str(str_value=None): + s = "test_" if str_value is None else str_value + for i in range(8): + while True: + c = chr(random.randint(0x0000, 0xffff)) + if c.isprintable(): + break + s += c + s += "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)) + return s + + def gen_str_by_length(length=8): return "".join(random.choice(string.ascii_letters + string.digits) for _ in range(length)) diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 4735f7979e..5c2a475c02 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -12,7 +12,7 @@ allure-pytest==2.7.0 pytest-print==0.2.1 pytest-level==0.1.1 pytest-xdist==2.5.0 -pymilvus==2.2.2.dev3 +pymilvus==2.2.2.dev8 pytest-rerunfailures==9.1.1 git+https://github.com/Projectplace/pytest-tags ndg-httpsclient