[test]Refine bulk insert test (#22031)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2023-02-08 09:06:32 +08:00 committed by GitHub
parent a8a7186d8a
commit 734d0f072b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 239 additions and 250 deletions

View File

@ -92,7 +92,7 @@ class ApiUtilityWrapper:
unknown, "unknown")
def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted,
timeout=None, using="default", **kwargs):
timeout=None, using="default", **kwargs):
start = time.time()
tasks_state_distribution = {
"success": set(),
@ -182,6 +182,24 @@ class ApiUtilityWrapper:
if task.task_id in pending_task_ids:
log.info(f"task {task.task_id} state transfer from pending to {task.state_name}")
def wait_index_build_completed(self, collection_name, timeout=None):
start = time.time()
if timeout is not None:
task_timeout = timeout
else:
task_timeout = TIMEOUT
end = time.time()
while end-start <= task_timeout:
time.sleep(0.5)
index_states, _ = self.index_building_progress(collection_name)
log.debug(f"index states: {index_states}")
if index_states["total_rows"] == index_states["indexed_rows"]:
log.info(f"index build completed")
return True
end = time.time()
log.info(f"index build timeout")
return False
def get_query_segment_info(self, collection_name, timeout=None, using="default", check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name

View File

@ -5,7 +5,7 @@ import pathlib
import numpy as np
import random
from sklearn import preprocessing
from common.common_func import gen_unique_str
from common.common_func import gen_unique_unicode_str
from minio_comm import copy_files_to_minio
from utils.util_log import test_log as log
@ -65,7 +65,7 @@ def gen_float_vectors(nb, dim):
def gen_str_invalid_vectors(nb, dim):
vectors = [[str(gen_unique_str()) for _ in range(dim)] for _ in range(nb)]
vectors = [[str(gen_unique_unicode_str()) for _ in range(dim)] for _ in range(nb)]
return vectors
@ -101,7 +101,7 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
data_field = data_fields[j]
if data_field == DataField.pk_field:
if str_pk:
f.write('"uid":"' + str(gen_unique_str()) + '"')
f.write('"uid":"' + str(gen_unique_unicode_str()) + '"')
else:
if err_type == DataErrorType.float_on_int_pk:
f.write('"uid":' + str(i + start_uid + random.random()) + '')
@ -117,11 +117,11 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
if err_type == DataErrorType.int_on_float_scalar:
f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '')
elif err_type == DataErrorType.str_on_float_scalar:
f.write('"float_scalar":"' + str(gen_unique_str()) + '"')
f.write('"float_scalar":"' + str(gen_unique_unicode_str()) + '"')
else:
f.write('"float_scalar":' + str(random.random()) + '')
if data_field == DataField.string_field:
f.write('"string_scalar":"' + str(gen_unique_str()) + '"')
f.write('"string_scalar":"' + str(gen_unique_unicode_str()) + '"')
if data_field == DataField.bool_field:
if err_type == DataErrorType.typo_on_bool:
f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '')
@ -161,7 +161,7 @@ def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect,
data_field = data_fields[j]
if data_field == DataField.pk_field:
if str_pk:
f.write('"uid":["' + ',"'.join(str(gen_unique_str()) + '"' for i in range(rows)) + ']')
f.write('"uid":["' + ',"'.join(str(gen_unique_unicode_str()) + '"' for i in range(rows)) + ']')
f.write("\n")
else:
if err_type == DataErrorType.float_on_int_pk:
@ -184,14 +184,14 @@ def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect,
str(random.randint(-999999, 9999999)) for i in range(rows)) + "]")
elif err_type == DataErrorType.str_on_float_scalar:
f.write('"float_scalar":["' + ',"'.join(str(
gen_unique_str()) + '"' for i in range(rows)) + ']')
gen_unique_unicode_str()) + '"' for i in range(rows)) + ']')
else:
f.write('"float_scalar":[' + ",".join(
str(random.random()) for i in range(rows)) + "]")
f.write("\n")
if data_field == DataField.string_field:
f.write('"string_scalar":["' + ',"'.join(str(
gen_unique_str()) + '"' for i in range(rows)) + ']')
gen_unique_unicode_str()) + '"' for i in range(rows)) + ']')
f.write("\n")
if data_field == DataField.bool_field:
if err_type == DataErrorType.typo_on_bool:
@ -270,7 +270,22 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
# non vector columns
data = []
if rows > 0:
data = [gen_unique_str(str(i)) for i in range(start, rows+start)]
data = [gen_unique_unicode_str(str(i)) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
# non vector columns
data = []
if rows > 0:
data = [random.choice([True, False]) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
@ -380,6 +395,8 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.bool_field:
file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field,
rows=rows, force=force)

View File

@ -114,6 +114,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
@ -138,9 +139,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -187,6 +190,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
auto_id=auto_id,
str_pk=string_pk,
data_fields=default_vec_only_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
@ -218,9 +222,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load()
self.collection_wrap.load(_reshard=True)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -310,12 +316,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert m_partition.num_entities == entities
assert self.collection_wrap.num_entities == entities
log.debug(state)
time.sleep(20)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {"total_rows": entities, "indexed_rows": entities}
assert res == exp_res
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(partition_names=[p_name], _refresh=True)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -396,15 +400,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': entities, 'indexed_rows': entities}
assert res == exp_res
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify num entities
assert self.collection_wrap.num_entities == entities
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
search_data = cf.gen_binary_vectors(1, dim)[1]
search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
res, _ = self.collection_wrap.search(
@ -510,12 +512,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# verify index status
res, _ = self.collection_wrap.has_index()
assert res is True
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': entities, 'indexed_rows': entities}
assert res == exp_res
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, dim)
@ -613,13 +614,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"collection entities: {num_entities}")
assert num_entities == bulk_insert_row + direct_insert_row
# verify index status
time.sleep(20)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities}
assert res == exp_res
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, dim=dim)
@ -675,13 +674,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
schema = cf.gen_collection_schema(fields=fields, auto_id=True)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
if create_index_before_bulk_insert:
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
if loaded_before_bulk_insert:
# load collection
self.collection_wrap.load()
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
@ -694,6 +695,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
if not create_index_before_bulk_insert:
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
if not loaded_before_bulk_insert:
# load collection
self.collection_wrap.load()
@ -702,12 +709,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"collection entities: {num_entities}")
assert num_entities == 500
# verify no index
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities}
assert res == exp_res
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, 16)
@ -799,7 +805,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert not success # TODO: check error msg
if is_row_based:
if fields_num_in_file == "less":
failed_reason = f"field '{additional_field}' missed at the row 0"
failed_reason = f"value of field '{additional_field}' is missed"
else:
failed_reason = f"field '{df.float_field}' is not defined in collection schema"
else:
@ -811,12 +817,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert success
log.info(f" collection entities: {self.collection_wrap.num_entities}")
assert self.collection_wrap.num_entities == entities
# verify no index
res, _ = self.collection_wrap.has_index()
assert res is True
# verify index
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -836,14 +842,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [False])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("multi_fields", [True, False])
@pytest.mark.parametrize("dim", [15])
@pytest.mark.parametrize("entities", [200])
# @pytest.mark.skip(reason="stop support for numpy files")
def test_float_vector_from_numpy_file(
self, is_row_based, auto_id, multi_fields, dim, entities
self, auto_id, dim, entities
):
"""
collection schema 1: [pk, float_vector]
@ -857,7 +860,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
4.1 verify the data entities equal the import data
4.2 verify search and query successfully
"""
data_fields = [df.vec_field]
if auto_id:
data_fields = [df.vec_field, df.int_field, df.string_field, df.float_field, df.bool_field]
else:
data_fields = [df.pk_field, df.vec_field, df.int_field, df.string_field, df.float_field, df.bool_field]
np_files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
@ -866,53 +872,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
data_fields=data_fields,
force=True,
)
if not multi_fields:
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
if not auto_id:
scalar_fields = [df.pk_field]
else:
scalar_fields = None
else:
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
]
if not auto_id:
scalar_fields = [
df.pk_field,
df.float_field,
df.int_field,
df.string_field,
df.bool_field,
]
else:
scalar_fields = [
df.int_field,
df.string_field,
df.bool_field,
df.float_field,
]
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int64_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_float_field(name=df.float_field),
cf.gen_bool_field(name=df.bool_field),
]
files = np_files
if scalar_fields is not None:
json_files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
dim=dim,
auto_id=auto_id,
rows=entities,
data_fields=scalar_fields,
force=True,
)
files = np_files + json_files
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
@ -930,112 +898,39 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
if is_row_based:
assert not success
failed_reason1 = "unsupported file type for row-based mode"
failed_reason2 = (
f"JSON row validator: field {df.vec_field} missed at the row 0"
)
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason1 in state.infos.get(
"failed_reason", ""
) or failed_reason2 in state.infos.get("failed_reason", "")
else:
assert success
log.info(f" collection entities: {self.collection_wrap.num_entities}")
assert self.collection_wrap.num_entities == entities
# create index and load
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
# verify imported data is available for search
nq = 2
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
def test_data_type_float_on_int_pk(self, is_row_based, dim, entities):
"""
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
data files: json file that one of entities has float on int pk
Steps:
1. create collection
2. import data
3. verify the data entities
4. verify query successfully
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=False,
data_fields=default_multi_fields,
err_type=DataErrorType.float_on_int_pk,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
# TODO: add string pk
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert success
log.info(f" collection entities: {self.collection_wrap.num_entities}")
assert self.collection_wrap.num_entities == entities
# create index and load
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
result = self.utility_wrap.wait_index_build_completed(c_name)
assert result is True
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
# the pk value was automatically convert to int from float
res, _ = self.collection_wrap.query(
expr=f"{df.pk_field} in [3]", output_fields=[df.pk_field]
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
assert [{df.pk_field: 3}] == res
# verify imported data is available for search
nq = 2
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@ -1093,9 +988,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify imported data is available for search
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -1110,13 +1008,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
res, _ = self.collection_wrap.query(
expr=f"{df.pk_field} in {uids}", output_fields=[df.float_field]
)
assert isinstance(res[0].get(df.float_field, 1), float)
assert isinstance(res[0].get(df.float_field, 1), np.float32)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.skip(reason="stop support for numpy files")
def test_with_all_field_numpy(self, auto_id, dim, entities):
"""
collection schema 1: [pk, int64, float64, string float_vector]
@ -1126,7 +1023,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
2. import data
3. verify
"""
data_fields = [df.pk_field, df.int_field, df.float_field, df.double_field, df.vec_field]
data_fields = [df.int_field, df.float_field, df.double_field, df.vec_field]
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_int64_field(name=df.int_field),
@ -1167,10 +1064,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
success = self.utility_wrap.wait_index_build_completed(c_name)
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -1181,6 +1079,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@ -1240,10 +1142,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert success
log.info(f" collection entities: {self.collection_wrap.num_entities}")
assert self.collection_wrap.num_entities == entities * file_nums
# verify imported data is indexed
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(20)
self.collection_wrap.load(_refresh=True)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -1255,10 +1159,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
check_items={"nq": 1, "limit": 1},
)
# TODO: not supported yet
def test_from_customize_bucket(self):
pass
class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@ -1524,7 +1424,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert not success
if is_row_based:
value = df.vec_field # if auto_id else df.pk_field
failed_reason = f"invalid JSON format, the root key should be 'rows', but get {value}"
failed_reason = f"invalid JSON format, the root key should be 'rows', but get '{value}'"
else:
failed_reason = "JSON parse: row count is 0"
for state in states.values():
@ -1878,17 +1778,16 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("dim", [16])
@pytest.mark.parametrize("entities", [300])
@pytest.mark.parametrize("file_nums", [10]) # max task nums 32? need improve
@pytest.mark.skip(reason="not support multiple files now")
def test_float_vector_one_of_files_fail(
def test_float_vector_with_multi_json_files(
self, is_row_based, auto_id, dim, entities, file_nums
):
"""
collection schema: [pk, float_vectors, int_scalar], one of entities has wrong dim data
data files: multi files, and there are errors in one of files
1. import data 11 files(10 correct and 1 with errors) into the collection
data files: multi files,
1. import data 10 files
2. verify that import fails with errors and no data imported
"""
correct_files = prepare_bulk_insert_json_files(
multi_files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
@ -1899,20 +1798,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
file_nums=file_nums,
force=True,
)
# append a file that has errors
dismatch_dim = dim + 1
err_files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dismatch_dim,
auto_id=auto_id,
data_fields=default_multi_fields,
file_nums=1,
)
files = correct_files + err_files
files = multi_files
random.shuffle(files) # mix up the file order
self._connect()
@ -1929,22 +1815,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
collection_name=c_name, files=files,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "row-based import, only allow one JSON file each time"}
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert not success
if is_row_based:
# all correct files shall be imported successfully
assert self.collection_wrap.num_entities == entities * file_nums
else:
assert self.collection_wrap.num_entities == 0
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@ -2109,6 +1987,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
# @pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/21818")
def test_data_type_string_on_int_pk(self, is_row_based, dim, entities):
"""
collection schema: default multi scalars
@ -2153,7 +2032,61 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = f"illegal numeric value"
failed_reason = f"illegal value"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
def test_data_type_float_on_int_pk(self, is_row_based, dim, entities):
"""
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
data files: json file that one of entities has float on int pk
Steps:
1. create collection
2. import data with wrong data type
3. verify import failed
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=False,
data_fields=default_multi_fields,
err_type=DataErrorType.float_on_int_pk,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
# TODO: add string pk
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = f"failed to convert row value to entity"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2174,6 +2107,16 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
2. import data
3. verify import failed with errors
"""
multi_fields = [
df.vec_field,
df.int_field,
df.string_field,
df.bool_field,
df.float_field,
]
if not auto_id:
multi_fields.insert(0, df.pk_field)
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
@ -2187,7 +2130,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
# TODO: add string pk
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
@ -2330,7 +2272,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = "illegal numeric value"
failed_reason = "failed to convert row value to entity"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2392,12 +2334,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = "illegal numeric value"
if not float_vector:
failed_reason = f"the field '{df.vec_field}' value at the row {wrong_position} is invalid"
failed_reason1 = "failed to parse row value"
failed_reason2 = "failed to convert row value to entity"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
assert failed_reason1 in state.infos.get("failed_reason", "") or \
failed_reason2 in state.infos.get("failed_reason", "")
assert self.collection_wrap.num_entities == 0

View File

@ -43,6 +43,18 @@ def gen_unique_str(str_value=None):
return "test_" + prefix if str_value is None else str_value + "_" + prefix
def gen_unique_unicode_str(str_value=None):
s = "test_" if str_value is None else str_value
for i in range(8):
while True:
c = chr(random.randint(0x0000, 0xffff))
if c.isprintable():
break
s += c
s += "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
return s
def gen_str_by_length(length=8):
return "".join(random.choice(string.ascii_letters + string.digits) for _ in range(length))

View File

@ -12,7 +12,7 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.2.2.dev3
pymilvus==2.2.2.dev8
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient