[test]Add method to stream write to generate bulk insert file (#22494)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2023-03-01 20:43:48 +08:00 committed by GitHub
parent 6fa8d62879
commit f5b8bcafb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 15 deletions

View File

@ -113,9 +113,9 @@ class TestChaos(TestChaosBase):
Path(data_dir).mkdir(parents=True, exist_ok=True)
files = []
if file_type == "json":
files = cf.gen_json_files_for_bulk_insert(data, schema, data_dir)
files = cf.gen_json_files_for_bulk_insert(data, schema, data_dir, nb=nb, dim=dim)
if file_type == "npy":
files = cf.gen_npy_files_for_bulk_insert(data, schema, data_dir)
files = cf.gen_npy_files_for_bulk_insert(data, schema, data_dir, nb=nb, dim=dim)
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
for file_name in files:

View File

@ -7,7 +7,7 @@ from functools import singledispatch
import numpy as np
import pandas as pd
from sklearn import preprocessing
from npy_append_array import NpyAppendArray
from pymilvus import DataType
from base.schema_wrapper import ApiCollectionSchemaWrapper, ApiFieldSchemaWrapper
from common import common_type as ct
@ -336,36 +336,56 @@ def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, dim=ct.default_dim):
int_values = [i for i in range(nb)]
float_values = [np.float32(i) for i in range(nb)]
string_values = [str(i) for i in range(nb)]
float_vec_values = gen_vectors(nb, dim)
float_vec_values = [] # placeholder for float_vec
data = [int_values, float_values, string_values, float_vec_values]
return data
def gen_json_files_for_bulk_insert(data, schema, data_dir):
nb = len(data[0])
def gen_json_files_for_bulk_insert(data, schema, data_dir, **kwargs):
nb = kwargs.get("nb", ct.default_nb)
dim = kwargs.get("dim", ct.default_dim)
fields_name = [field.name for field in schema.fields]
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "bulk_insert_data_source.json"
files = ["bulk_insert_data_source.json"]
data_source = os.path.join(data_dir, file_name)
with open(data_source, "w") as f:
f.write(json.dumps(data_dict, indent=4, default=to_serializable))
f.write("{")
f.write("\n")
f.write('"rows":[')
f.write("\n")
for i in range(nb):
entity_value = [field_values[i] for field_values in data[:-1]]
vector = [random.random() for _ in range(dim)]
entity_value.append(vector)
entity = dict(zip(fields_name, entity_value))
f.write(json.dumps(entity, indent=4, default=to_serializable))
if i != nb - 1:
f.write(",")
f.write("\n")
f.write("]")
f.write("\n")
f.write("}")
return files
def gen_npy_files_for_bulk_insert(data, schema, data_dir):
def gen_npy_files_for_bulk_insert(data, schema, data_dir, **kwargs):
nb = kwargs.get("nb", ct.default_nb)
dim = kwargs.get("dim", ct.default_dim)
fields_name = [field.name for field in schema.fields]
files = []
for field in fields_name:
files.append(f"{field}.npy")
for i, file in enumerate(files):
data_source = os.path.join(data_dir, file)
np.save(data_source, np.array(data[i]))
if "vector" in file:
log.info(f"generate {nb} vectors with dim {dim} for {data_source}")
with NpyAppendArray(data_source, "wb") as npaa:
for j in range(nb):
vector = np.array([[random.random() for _ in range(dim)]])
npaa.append(vector)
else:
np.save(data_source, np.array(data[i]))
return files

View File

@ -33,6 +33,7 @@ timeout-decorator==0.5.0
# for bulk insert test
minio==7.1.5
npy-append-array==0.9.15
# for benchmark
h5py==3.7.0