diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index ce80d9cbf0..360aeedc42 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -7,6 +7,8 @@ import numpy as np from ml_dtypes import bfloat16 import pandas as pd import random +from pathlib import Path +import uuid from faker import Faker from sklearn import preprocessing from common.common_func import gen_unique_str @@ -670,20 +672,30 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs): + schema = kwargs.get("schema", None) + dir_prefix = f"json-{uuid.uuid4()}" + data_source_new = f"{data_source}/{dir_prefix}" + schema_file = f"{data_source_new}/schema.json" + Path(schema_file).parent.mkdir(parents=True, exist_ok=True) + if schema is not None: + data = schema.to_dict() + with open(schema_file, "w") as f: + json.dump(data, f) files = [] if file_size is not None: rows = 5000 start_uid = 0 for i in range(file_nums): file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" - file = f"{data_source}/{file_name}" + file = f"{data_source_new}/{file_name}" + Path(file).parent.mkdir(parents=True, exist_ok=True) data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs) # log.info(f"data: {data}") with open(file, "w") as f: json.dump(data, f) # get the file size if file_size is not None: - batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") # calculate the rows to be generated total_batch = int(file_size*1024*1024*1024/batch_file_size) @@ -693,17 +705,27 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_ for _ in range(total_batch): all_data += data file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" - with open(f"{data_source}/{file_name}", "w") as f: + with open(f"{data_source_new}/{file_name}", "w") as f: json.dump(all_data, f) - batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB") files.append(file_name) start_uid += rows + files = [f"{dir_prefix}/{f}" for f in files] return files -def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True): +def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True, **kwargs): # gen numpy files + schema = kwargs.get("schema", None) + u_id = f"numpy-{uuid.uuid4()}" + data_source_new = f"{data_source}/{u_id}" + schema_file = f"{data_source_new}/schema.json" + Path(schema_file).parent.mkdir(parents=True, exist_ok=True) + if schema is not None: + data = schema.to_dict() + with open(schema_file, "w") as f: + json.dump(data, f) files = [] start_uid = 0 if file_nums == 1: @@ -723,47 +745,47 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num if "fp16" in data_field: float_vector = True vector_type = "fp16" - file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector, + file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector, vector_type=vector_type, rows=rows, dim=dim, force=force) elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 - file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) + file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) elif data_field == DataField.bool_field: - file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) + file_name = gen_bool_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) elif data_field == DataField.json_field: - file_name = gen_json_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) + file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) else: - file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, + file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) files.append(file_name) if enable_dynamic_field and include_meta: - file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force) + file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force) files.append(file_name) if file_size is not None: batch_file_size = 0 for file_name in files: - batch_file_size += os.path.getsize(f"{data_source}/{file_name}") + batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB") # calculate the rows to be generated total_batch = int(file_size*1024*1024*1024/batch_file_size) total_rows = total_batch * rows new_files = [] for f in files: - arr = np.load(f"{data_source}/{f}") + arr = np.load(f"{data_source_new}/{f}") all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0) file_name = f - np.save(f"{data_source}/{file_name}", all_arr) + np.save(f"{data_source_new}/{file_name}", all_arr) log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}") new_files.append(file_name) files = new_files batch_file_size = 0 for file_name in files: - batch_file_size += os.path.getsize(f"{data_source}/{file_name}") + batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB") else: for i in range(file_nums): - subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) - dir = f"{data_source}/{subfolder}" + subfolder = gen_subfolder(root=data_source_new, dim=dim, rows=rows, file_num=i) + dir = f"{data_source_new}/{subfolder}" for data_field in data_fields: if DataField.vec_field in data_field: file_name = gen_vectors_in_numpy_file(dir=dir, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force) @@ -774,6 +796,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force) files.append(f"{subfolder}/{file_name}") start_uid += rows + files = [f"{u_id}/{f}" for f in files] return files @@ -784,7 +807,17 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0): return data -def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc"): +def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc", **kwargs): + schema = kwargs.get("schema", None) + u_id = f"parquet-{uuid.uuid4()}" + data_source_new = f"{data_source}/{u_id}" + schema_file = f"{data_source_new}/schema.json" + Path(schema_file).parent.mkdir(parents=True, exist_ok=True) + if schema is not None: + data = schema.to_dict() + with open(schema_file, "w") as f: + json.dump(data, f) + # gen numpy files if err_type == "": err_type = "none" @@ -805,12 +838,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_ log.info(f"df: \n{df}") file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" if row_group_size is not None: - df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) + df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size) else: - df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow') # get the file size if file_size is not None: - batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") # calculate the rows to be generated total_batch = int(file_size*1024*1024*1024/batch_file_size) @@ -819,10 +852,10 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_ file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" log.info(f"all df: \n {all_df}") if row_group_size is not None: - all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) + all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size) else: - all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') - batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow') + batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}") log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB") files.append(file_name) else: @@ -837,11 +870,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_ df = pd.DataFrame(all_field_data) file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet" if row_group_size is not None: - df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) + df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size) else: - df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow') files.append(file_name) start_uid += rows + files = [f"{u_id}/{f}" for f in files] return files @@ -931,7 +965,7 @@ def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bu def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None, - data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True): + data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, **kwargs): """ Generate column based files based on params in numpy format and copy them to the minio Note: each field in data_fields would be generated one numpy file. @@ -963,14 +997,14 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke """ files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size, data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, - file_nums=file_nums, force=force, include_meta=include_meta) + file_nums=file_nums, force=force, include_meta=include_meta, **kwargs) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) return files def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None, - enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc"): + enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc", **kwargs): """ Generate column based files based on params in parquet format and copy them to the minio Note: each field in data_fields would be generated one parquet file. @@ -1002,7 +1036,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc """ files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field, data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size, - file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format) + file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format, **kwargs) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) return files diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 1270efb822..65b22e0df3 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -849,13 +849,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100), cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), - # cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim), - # cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim), cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim), cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim), cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + files = prepare_bulk_insert_new_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -864,10 +866,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, force=True, + schema=schema ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -988,6 +988,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + files = prepare_bulk_insert_numpy_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -997,11 +1001,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert): enable_dynamic_field=enable_dynamic_field, force=True, include_meta=include_meta, - + schema=schema ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1089,8 +1090,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert): if enable_partition_key: assert len(self.collection_wrap.partitions) > 1 - - @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @@ -1125,6 +1124,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) files = prepare_bulk_insert_parquet_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -1134,12 +1136,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): enable_dynamic_field=enable_dynamic_field, force=True, include_meta=include_meta, + schema=schema, ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) - # import data t0 = time.time() task_id, _ = self.utility_wrap.do_bulk_insert( @@ -1224,7 +1223,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert len(res) == len(query_data) if enable_partition_key: assert len(self.collection_wrap.partitions) > 1 - + @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [128]) # 128 @@ -1257,6 +1256,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_sparse_vec_field(name=df.sparse_vec_field), ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) files = prepare_bulk_insert_parquet_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -1266,11 +1268,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert): enable_dynamic_field=enable_dynamic_field, force=True, include_meta=include_meta, - sparse_format=sparse_format + sparse_format=sparse_format, + schema=schema ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) # import data @@ -1378,6 +1379,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): cf.gen_sparse_vec_field(name=df.sparse_vec_field), ] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) files = prepare_bulk_insert_new_json_files( minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, @@ -1387,11 +1391,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert): enable_dynamic_field=enable_dynamic_field, force=True, include_meta=include_meta, - sparse_format=sparse_format + sparse_format=sparse_format, + schema=schema ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) # import data