diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index 208bff3227..d9d1756029 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -21,6 +21,10 @@ FLOAT = "float" class DataField: pk_field = "uid" vec_field = "vectors" + float_vec_field = "float_vectors" + image_float_vec_field = "image_float_vec_field" + text_float_vec_field = "text_float_vec_field" + binary_vec_field = "binary_vec_field" int_field = "int_scalar" string_field = "string_scalar" bool_field = "bool_scalar" @@ -317,12 +321,12 @@ def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, force=Fa if rows > 0: if float_vector: vectors = gen_float_vectors(rows, dim) + arr = np.array(vectors) else: vectors = gen_binary_vectors(rows, (dim // 8)) - arr = np.array(vectors) - # print(f"file_name: {file_name} data type: {arr.dtype}") - log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") - np.save(file, arr) + arr = np.array(vectors, dtype=np.dtype("uint8")) + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + np.save(file, arr) return file_name @@ -421,8 +425,15 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128 data = [] if rows > 0: - if data_field == DataField.vec_field: - data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim) + if "vec" in data_field: + if "float" in data_field: + data = gen_vectors(float_vector=True, rows=rows, dim=dim) + data = pd.Series([np.array(x, dtype=np.dtype("float32")) for x in data]) + elif "binary" in data_field: + data = gen_vectors(float_vector=False, rows=rows, dim=dim) + data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data]) + else: + data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim) elif data_field == DataField.float_field: data = [np.float32(random.random()) for _ in range(rows)] elif data_field == DataField.double_field: @@ -530,8 +541,11 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d for r in range(rows): d = {} for data_field in data_fields: - if data_field == DataField.vec_field: - # vector columns + if "vec" in data_field: + if "float" in data_field: + float_vector = True + if "binary" in data_field: + float_vector = False d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0] elif data_field == DataField.float_field: d[data_field] = random.random() @@ -608,7 +622,11 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num if file_nums == 1: # gen the numpy file without subfolders if only one set of files for data_field in data_fields: - if data_field == DataField.vec_field: + if "vec" in data_field: + if "float" in data_field: + float_vector = True + if "binary" in data_field: + float_vector = False file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force) elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 1948c1adaf..fdc8c9a581 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -337,7 +337,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) @pytest.mark.parametrize("entities", [2000]) - def test_binary_vector_only(self, is_row_based, auto_id, dim, entities): + def test_binary_vector_json(self, is_row_based, auto_id, dim, entities): """ collection schema: [pk, binary_vector] Steps: @@ -692,8 +692,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True]) - @pytest.mark.parametrize("dim", [2]) # 128 - @pytest.mark.parametrize("entities", [2]) # 1000 + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("entities", [1000]) # 1000 @pytest.mark.parametrize("enable_dynamic_field", [True]) def test_bulk_insert_all_field_with_new_json_format(self, auto_id, dim, entities, enable_dynamic_field): """ @@ -714,7 +714,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100), cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim), + cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim) ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] files = prepare_bulk_insert_new_json_files( @@ -748,32 +751,63 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert num_entities == entities # verify imported data is available for search index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) + float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name] + binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name] + for f in float_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=index_params + ) + for f in binary_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=ct.default_binary_index + ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") time.sleep(2) # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=1, - output_fields=["*"], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": 1}, - ) - for hit in res: - for r in hit: - fields_from_search = r.fields.keys() - for f in fields: - assert f.name in fields_from_search - if enable_dynamic_field: - assert "name" in fields_from_search - assert "address" in fields_from_search + for field_name in float_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + _, search_data = cf.gen_binary_vectors(1, dim) + search_params = ct.default_search_binary_params + for field_name in binary_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @@ -796,7 +830,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_float_field(name=df.float_field), cf.gen_double_field(name=df.double_field), cf.gen_json_field(name=df.json_field), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim), + cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim) ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] files = prepare_bulk_insert_numpy_files( @@ -805,8 +842,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): rows=entities, dim=dim, data_fields=data_fields, - force=True, enable_dynamic_field=enable_dynamic_field, + force=True, ) self._connect() c_name = cf.gen_unique_str("bulk_insert") @@ -830,32 +867,61 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert num_entities == entities # verify imported data is available for search index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) + float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name] + binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name] + for f in float_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=index_params + ) + for f in binary_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=ct.default_binary_index + ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") time.sleep(2) # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=1, - output_fields=["*"], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": 1}, - ) - for hit in res: - for r in hit: - fields_from_search = r.fields.keys() - for f in fields: - assert f.name in fields_from_search - if enable_dynamic_field: - assert "name" in fields_from_search - assert "address" in fields_from_search + for field_name in float_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + _, search_data = cf.gen_binary_vectors(1, dim) + search_params = ct.default_search_binary_params + for field_name in binary_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @@ -883,17 +949,18 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100), cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim), + cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim), + cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim) ] - data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] files = prepare_bulk_insert_parquet_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, rows=entities, dim=dim, data_fields=data_fields, - file_nums=file_nums, - array_length=array_len, enable_dynamic_field=enable_dynamic_field, force=True, ) @@ -919,32 +986,61 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert num_entities == entities # verify imported data is available for search index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) + float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name] + binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name] + for f in float_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=index_params + ) + for f in binary_vec_fields: + self.collection_wrap.create_index( + field_name=f, index_params=ct.default_binary_index + ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") time.sleep(2) # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=1, - output_fields=["*"], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": 1}, - ) - for hit in res: - for r in hit: - fields_from_search = r.fields.keys() - for f in fields: - assert f.name in fields_from_search - if enable_dynamic_field: - assert "name" in fields_from_search - assert "address" in fields_from_search + for field_name in float_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search + + _, search_data = cf.gen_binary_vectors(1, dim) + search_params = ct.default_search_binary_params + for field_name in binary_vec_fields: + res, _ = self.collection_wrap.search( + search_data, + field_name, + param=search_params, + limit=1, + output_fields=["*"], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hit in res: + for r in hit: + fields_from_search = r.fields.keys() + for f in fields: + assert f.name in fields_from_search + if enable_dynamic_field: + assert "name" in fields_from_search + assert "address" in fields_from_search @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True])