test: refine file dir in import test (#33600)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-06-05 10:29:51 +08:00 committed by GitHub
parent 25080bb455
commit 05a80f4def
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 90 additions and 54 deletions

View File

@ -7,6 +7,8 @@ import numpy as np
from ml_dtypes import bfloat16 from ml_dtypes import bfloat16
import pandas as pd import pandas as pd
import random import random
from pathlib import Path
import uuid
from faker import Faker from faker import Faker
from sklearn import preprocessing from sklearn import preprocessing
from common.common_func import gen_unique_str from common.common_func import gen_unique_str
@ -670,20 +672,30 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs): def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
dir_prefix = f"json-{uuid.uuid4()}"
data_source_new = f"{data_source}/{dir_prefix}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
files = [] files = []
if file_size is not None: if file_size is not None:
rows = 5000 rows = 5000
start_uid = 0 start_uid = 0
for i in range(file_nums): for i in range(file_nums):
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source}/{file_name}" file = f"{data_source_new}/{file_name}"
Path(file).parent.mkdir(parents=True, exist_ok=True)
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs) data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs)
# log.info(f"data: {data}") # log.info(f"data: {data}")
with open(file, "w") as f: with open(file, "w") as f:
json.dump(data, f) json.dump(data, f)
# get the file size # get the file size
if file_size is not None: if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}") batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated # calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size) total_batch = int(file_size*1024*1024*1024/batch_file_size)
@ -693,17 +705,27 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_
for _ in range(total_batch): for _ in range(total_batch):
all_data += data all_data += data
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
with open(f"{data_source}/{file_name}", "w") as f: with open(f"{data_source_new}/{file_name}", "w") as f:
json.dump(all_data, f) json.dump(all_data, f)
batch_file_size = os.path.getsize(f"{data_source}/{file_name}") batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB") log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB")
files.append(file_name) files.append(file_name)
start_uid += rows start_uid += rows
files = [f"{dir_prefix}/{f}" for f in files]
return files return files
def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True): def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True, **kwargs):
# gen numpy files # gen numpy files
schema = kwargs.get("schema", None)
u_id = f"numpy-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
files = [] files = []
start_uid = 0 start_uid = 0
if file_nums == 1: if file_nums == 1:
@ -723,47 +745,47 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
if "fp16" in data_field: if "fp16" in data_field:
float_vector = True float_vector = True
vector_type = "fp16" vector_type = "fp16"
file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector, file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector,
vector_type=vector_type, rows=rows, dim=dim, force=force) vector_type=vector_type, rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.bool_field: elif data_field == DataField.bool_field:
file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_bool_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.json_field: elif data_field == DataField.json_field:
file_name = gen_json_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else: else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force) rows=rows, force=force)
files.append(file_name) files.append(file_name)
if enable_dynamic_field and include_meta: if enable_dynamic_field and include_meta:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force) file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force)
files.append(file_name) files.append(file_name)
if file_size is not None: if file_size is not None:
batch_file_size = 0 batch_file_size = 0
for file_name in files: for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}") batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB") log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated # calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size) total_batch = int(file_size*1024*1024*1024/batch_file_size)
total_rows = total_batch * rows total_rows = total_batch * rows
new_files = [] new_files = []
for f in files: for f in files:
arr = np.load(f"{data_source}/{f}") arr = np.load(f"{data_source_new}/{f}")
all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0) all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0)
file_name = f file_name = f
np.save(f"{data_source}/{file_name}", all_arr) np.save(f"{data_source_new}/{file_name}", all_arr)
log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}") log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}")
new_files.append(file_name) new_files.append(file_name)
files = new_files files = new_files
batch_file_size = 0 batch_file_size = 0
for file_name in files: for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}") batch_file_size += os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB") log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB")
else: else:
for i in range(file_nums): for i in range(file_nums):
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) subfolder = gen_subfolder(root=data_source_new, dim=dim, rows=rows, file_num=i)
dir = f"{data_source}/{subfolder}" dir = f"{data_source_new}/{subfolder}"
for data_field in data_fields: for data_field in data_fields:
if DataField.vec_field in data_field: if DataField.vec_field in data_field:
file_name = gen_vectors_in_numpy_file(dir=dir, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force) file_name = gen_vectors_in_numpy_file(dir=dir, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force)
@ -774,6 +796,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force) file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force)
files.append(f"{subfolder}/{file_name}") files.append(f"{subfolder}/{file_name}")
start_uid += rows start_uid += rows
files = [f"{u_id}/{f}" for f in files]
return files return files
@ -784,7 +807,17 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
return data return data
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc"): def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc", **kwargs):
schema = kwargs.get("schema", None)
u_id = f"parquet-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
# gen numpy files # gen numpy files
if err_type == "": if err_type == "":
err_type = "none" err_type = "none"
@ -805,12 +838,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
log.info(f"df: \n{df}") log.info(f"df: \n{df}")
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
if row_group_size is not None: if row_group_size is not None:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else: else:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
# get the file size # get the file size
if file_size is not None: if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}") batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated # calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size) total_batch = int(file_size*1024*1024*1024/batch_file_size)
@ -819,10 +852,10 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
log.info(f"all df: \n {all_df}") log.info(f"all df: \n {all_df}")
if row_group_size is not None: if row_group_size is not None:
all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else: else:
all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') all_df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
batch_file_size = os.path.getsize(f"{data_source}/{file_name}") batch_file_size = os.path.getsize(f"{data_source_new}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB") log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB")
files.append(file_name) files.append(file_name)
else: else:
@ -837,11 +870,12 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
df = pd.DataFrame(all_field_data) df = pd.DataFrame(all_field_data)
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet" file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet"
if row_group_size is not None: if row_group_size is not None:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow', row_group_size=row_group_size) df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow', row_group_size=row_group_size)
else: else:
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') df.to_parquet(f"{data_source_new}/{file_name}", engine='pyarrow')
files.append(file_name) files.append(file_name)
start_uid += rows start_uid += rows
files = [f"{u_id}/{f}" for f in files]
return files return files
@ -931,7 +965,7 @@ def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bu
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None, def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None,
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True): data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, **kwargs):
""" """
Generate column based files based on params in numpy format and copy them to the minio Generate column based files based on params in numpy format and copy them to the minio
Note: each field in data_fields would be generated one numpy file. Note: each field in data_fields would be generated one numpy file.
@ -963,14 +997,14 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
""" """
files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size, files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size,
data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, data_fields=data_fields, enable_dynamic_field=enable_dynamic_field,
file_nums=file_nums, force=force, include_meta=include_meta) file_nums=file_nums, force=force, include_meta=include_meta, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files
def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None, def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc"): enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc", **kwargs):
""" """
Generate column based files based on params in parquet format and copy them to the minio Generate column based files based on params in parquet format and copy them to the minio
Note: each field in data_fields would be generated one parquet file. Note: each field in data_fields would be generated one parquet file.
@ -1002,7 +1036,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc
""" """
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field, files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size, data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size,
file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format) file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files

View File

@ -849,13 +849,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100), cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim), cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
# cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
# cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim), cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim), cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_new_json_files( files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
@ -864,10 +866,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
data_fields=data_fields, data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
schema=schema
) )
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data
@ -988,6 +988,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_numpy_files( files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
@ -997,11 +1001,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
include_meta=include_meta, include_meta=include_meta,
schema=schema
) )
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data
@ -1089,8 +1090,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
if enable_partition_key: if enable_partition_key:
assert len(self.collection_wrap.partitions) > 1 assert len(self.collection_wrap.partitions) > 1
@pytest.mark.tags(CaseLabel.L3) @pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("dim", [128]) # 128
@ -1125,6 +1124,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim) cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_parquet_files( files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
@ -1134,12 +1136,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
include_meta=include_meta, include_meta=include_meta,
schema=schema,
) )
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data
t0 = time.time() t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert( task_id, _ = self.utility_wrap.do_bulk_insert(
@ -1257,6 +1256,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_sparse_vec_field(name=df.sparse_vec_field), cf.gen_sparse_vec_field(name=df.sparse_vec_field),
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_parquet_files( files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
@ -1266,11 +1268,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
include_meta=include_meta, include_meta=include_meta,
sparse_format=sparse_format sparse_format=sparse_format,
schema=schema
) )
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data
@ -1378,6 +1379,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_sparse_vec_field(name=df.sparse_vec_field), cf.gen_sparse_vec_field(name=df.sparse_vec_field),
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_new_json_files( files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
@ -1387,11 +1391,9 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
enable_dynamic_field=enable_dynamic_field, enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
include_meta=include_meta, include_meta=include_meta,
sparse_format=sparse_format sparse_format=sparse_format,
schema=schema
) )
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data