Remove row_based parameter in bulk_insert tests (#20171)

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
yanliang567 2022-10-28 17:21:33 +08:00 committed by GitHub
parent 4412cfcaaf
commit 8cb5122d4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 162 deletions

View File

@ -18,13 +18,13 @@ class ApiUtilityWrapper:
ut = utility
role = None
def bulk_insert(self, collection_name, is_row_based=True, files="", partition_name=None, timeout=None,
def bulk_insert(self, collection_name, files="", partition_name=None, timeout=None,
using="default", check_task=None, check_items=None, **kwargs):
working_tasks = self.get_bulk_insert_working_list()
log.info(f"before bulk load, there are {len(working_tasks)} working tasks")
log.info(f"files to load: {files}")
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.bulk_insert, collection_name, is_row_based,
res, is_succ = api_request([self.ut.bulk_insert, collection_name,
files, partition_name, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, using=using).run()

View File

@ -1,3 +1,4 @@
import copy
import time
import os
import pathlib
@ -340,7 +341,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
files = []
start_uid = 0
# make sure pk field exists when not auto_id
if not auto_id and DataField.pk_field not in data_fields:
if (not auto_id) and (DataField.pk_field not in data_fields):
data_fields.append(DataField.pk_field)
for i in range(file_nums):
file_name = gen_file_name(is_row_based=is_row_based, rows=rows, dim=dim,
@ -397,10 +398,11 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
return files
def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket", is_row_based=True, rows=100, dim=128,
auto_id=True, str_pk=False, float_vector=True,
data_fields=[], file_nums=1, multi_folder=False,
file_type=".json", err_type="", force=False, **kwargs):
def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket",
is_row_based=True, rows=100, dim=128,
auto_id=True, str_pk=False, float_vector=True,
data_fields=[], file_nums=1, multi_folder=False,
file_type=".json", err_type="", force=False, **kwargs):
"""
Generate files based on the params in json format and copy them to minio
@ -456,9 +458,12 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
:return list
file names list
"""
data_fields_c = copy.deepcopy(data_fields)
print(f"data_fields: {data_fields}")
print(f"data_fields_c: {data_fields_c}")
files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim,
auto_id=auto_id, str_pk=str_pk, float_vector=float_vector,
data_fields=data_fields, file_nums=file_nums, multi_folder=multi_folder,
data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder,
file_type=file_type, err_type=err_type, force=force, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)

View File

@ -73,7 +73,7 @@ class TestcaseBaseBulkInsert(TestcaseBase):
class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8, 128
@pytest.mark.parametrize("entities", [100]) # 100, 1000
@ -112,7 +112,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -156,7 +155,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
def test_str_pk_float_vector_only(self, is_row_based, dim, entities):
@ -193,7 +192,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -239,7 +238,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [3000])
@ -291,7 +290,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=p_name,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -332,7 +330,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [16])
@pytest.mark.parametrize("entities", [2000])
@ -381,9 +379,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.load()
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
)
task_ids, _ = self.utility_wrap.bulk_insert(collection_name=c_name,
files=files)
logging.info(f"bulk insert task ids:{task_ids}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
@ -416,7 +413,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize(
"fields_num_in_file", ["equal", "more", "less"]
@ -456,6 +453,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
if fields_num_in_file == "more":
fields.pop()
@ -474,7 +472,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -482,12 +480,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
if fields_num_in_file == "less":
if fields_num_in_file in ["less", "more"]:
assert not success
if is_row_based:
failed_reason = (
f"JSON row validator: field {additional_field} missed at the row 0"
)
if fields_num_in_file == "less":
failed_reason = f"field '{additional_field}' missed at the row 0"
else:
failed_reason = f"field '{df.float_field}' is not defined in collection schema"
else:
failed_reason = "is not equal to other fields"
for state in states.values():
@ -546,12 +545,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
"""
bulk_insert_row = 500
direct_insert_row = 3000
dim = 16
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=True,
rows=bulk_insert_row,
dim=16,
dim=dim,
data_fields=[df.pk_field, df.float_field, df.vec_field],
force=True,
)
@ -560,12 +560,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_field(name=df.float_field),
cf.gen_float_vec_field(name=df.vec_field, dim=16),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data = [
[i for i in range(direct_insert_row)],
[np.float32(i) for i in range(direct_insert_row)],
cf.gen_vectors(direct_insert_row, 16),
cf.gen_vectors(direct_insert_row, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields)
@ -584,7 +584,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=True, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -610,7 +610,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
time.sleep(5)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, 16)
search_data = cf.gen_vectors(nq, dim=dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
@ -651,7 +651,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
rows=500,
dim=16,
auto_id=True,
data_fields=[df.pk_field, df.vec_field],
data_fields=[df.vec_field],
force=True,
)
self._connect()
@ -673,7 +673,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=True, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -716,7 +716,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize(
"fields_num_in_file", ["equal", "more", "less"]
) # "equal", "more", "less"
@ -757,6 +757,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
if fields_num_in_file == "more":
fields.pop()
@ -774,7 +775,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -782,12 +783,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
if fields_num_in_file == "less":
if fields_num_in_file in ["less", "more"]:
assert not success # TODO: check error msg
if is_row_based:
failed_reason = (
f"JSON row validator: field {additional_field} missed at the row 0"
)
if fields_num_in_file == "less":
failed_reason = f"field '{additional_field}' missed at the row 0"
else:
failed_reason = f"field '{df.float_field}' is not defined in collection schema"
else:
failed_reason = "is not equal to other fields"
for state in states.values():
@ -821,7 +823,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499")), False]) # True, False
@pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499"))]) # True, False
@pytest.mark.parametrize("auto_id", [True, False]) # True, False
@pytest.mark.parametrize("dim", [16]) # 16
@pytest.mark.parametrize("entities", [100]) # 3000
@ -863,6 +865,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
@ -876,7 +879,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -922,11 +925,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("multi_fields", [True, False])
@pytest.mark.parametrize("dim", [15])
@pytest.mark.parametrize("entities", [200])
@pytest.mark.skip(reason="stop support for numpy files")
def test_float_vector_from_numpy_file(
self, is_row_based, auto_id, multi_fields, dim, entities
):
@ -1006,7 +1010,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1060,7 +1064,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
def test_data_type_float_on_int_pk(self, is_row_based, dim, entities):
@ -1094,12 +1098,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1122,7 +1127,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert [{df.pk_field: 3}] == res
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
@ -1163,7 +1168,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1200,6 +1205,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.skip(reason="stop support for numpy files")
def test_with_all_field_numpy(self, auto_id, dim, entities):
"""
collection schema 1: [pk, int64, float64, string float_vector]
@ -1233,7 +1239,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=False, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1281,8 +1287,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
2. import data
3. verify that import numpy files in a loop
"""
is_row_based = False # numpy files supports only column based
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
@ -1314,7 +1318,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
force=True,
)
task_id, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
task_ids.append(task_id[0])
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1348,8 +1352,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
def test_non_existing_file(self, is_row_based):
def test_non_existing_file(self):
"""
collection: either auto_id or not
collection schema: not existing file(s)
@ -1373,7 +1376,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1381,13 +1383,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids=task_ids, timeout=90
)
assert not success
failed_reason = f"failed to get file size of {files[0]}"
failed_reason = f"failed to get file size of '{files[0]}'"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
def test_empty_json_file(self, is_row_based, auto_id):
"""
@ -1423,7 +1425,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1431,13 +1432,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids=task_ids, timeout=90
)
assert not success
failed_reason = "JSON parse: row count is 0"
failed_reason = "JSON parser: row count is 0"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
@ -1486,7 +1487,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1502,7 +1502,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [100])
@ -1539,7 +1539,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1549,7 +1548,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert not success
if is_row_based:
value = df.vec_field # if auto_id else df.pk_field
failed_reason = f"JSON parse: invalid row-based JSON format, the key {value} is not found"
failed_reason = f"JSON parser: invalid row-based JSON format, the key {value} is not found"
else:
failed_reason = "JSON parse: row count is 0"
for state in states.values():
@ -1557,7 +1556,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
@ -1594,7 +1593,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1608,7 +1606,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
else:
assert not success
if is_row_based:
failed_reason = f"field {dismatch_pk_field} missed at the row 0"
failed_reason = f"the field '{df.pk_field}' is not defined in collection schema"
else:
failed_reason = f"field {dismatch_pk_field} row count 0 is not equal to other fields row count"
for state in states.values():
@ -1616,7 +1614,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
@ -1652,7 +1650,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1664,7 +1661,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert not success
if is_row_based:
failed_reason = f"field {dismatch_vec_field} missed at the row 0"
failed_reason = f"the field '{df.vec_field}' is not defined in collection schema"
else:
if auto_id:
failed_reason = f"JSON column consumer: row count is 0"
@ -1675,7 +1672,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [200])
@ -1712,7 +1709,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name="",
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -1723,7 +1719,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success} in {tt}")
assert not success
if is_row_based:
failed_reason = f"field {dismatch_scalar_field} missed at the row 0"
failed_reason = f"field '{df.int_field}' is not defined in collection schema"
else:
failed_reason = f"field {dismatch_scalar_field} row count 0 is not equal to other fields row count"
for state in states.values():
@ -1731,7 +1727,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [200])
@ -1763,7 +1759,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1771,13 +1767,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors at the row "
failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [200])
def test_non_existing_collection(self, is_row_based, dim, entities):
@ -1801,14 +1797,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
err_msg = f"can't find collection: {c_name}"
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
is_row_based=is_row_based,
files=files,
check_task=CheckTasks.err_res,
check_items={"err_code": 1, "err_msg": err_msg},
)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [200])
@pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19553")
@ -1842,14 +1837,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=p_name,
is_row_based=is_row_based,
files=files,
check_task=CheckTasks.err_res,
check_items={"err_code": 11, "err_msg": err_msg},
)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [1000])
@ -1886,7 +1880,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -1895,7 +1889,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = (
f"doesn't equal to vector dimension {dim} of field vectors at the row"
f"doesn't equal to vector dimension {dim} of field vectors"
)
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
@ -1903,7 +1897,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [16])
@pytest.mark.parametrize("entities", [300])
@ -1952,6 +1946,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
@ -1959,7 +1954,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2011,7 +2006,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=False, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2021,7 +2016,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success} in {tt}")
assert not success
failed_reason = f"illegal row width {dim}"
failed_reason = f"illegal dimension {dim} of numpy file"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2031,7 +2026,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("auto_id", [False])
@pytest.mark.parametrize("dim", [15])
@pytest.mark.parametrize("entities", [100])
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/18992")
def test_wrong_field_name_in_numpy(self, auto_id, dim, entities):
"""
collection schema 1: [pk, float_vector]
@ -2068,7 +2062,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=False, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2078,7 +2072,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success} in {tt}")
assert not success
failed_reason = f"Numpy parse: the field {df.vec_field} doesn't exist"
failed_reason = f"file '{df.vec_field}.npy' has no corresponding field in collection"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2119,7 +2113,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=False, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2135,7 +2129,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
def test_data_type_string_on_int_pk(self, is_row_based, dim, entities):
@ -2168,12 +2162,13 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=False)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2188,7 +2183,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [10])
@ -2211,7 +2206,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
auto_id=False,
data_fields=default_multi_fields,
err_type=DataErrorType.typo_on_bool,
scalars=default_multi_fields,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
@ -2228,7 +2223,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2271,7 +2266,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
2. import data
3. fail to import data with errors
"""
is_row_based = False # numpy files supports only column based
data_fields = [df.vec_field]
if not auto_id:
data_fields.append(df.pk_field)
@ -2295,7 +2289,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
@ -2311,7 +2305,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [9])
@pytest.mark.parametrize("entities", [10])
@ -2334,66 +2328,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
auto_id=auto_id,
data_fields=default_multi_fields,
err_type=DataErrorType.str_on_float_scalar,
scalars=default_multi_fields,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = "illegal numeric value"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("float_vector", [True, False])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [500])
def test_data_type_str_on_vector_fields(
self, is_row_based, auto_id, float_vector, dim, entities
):
"""
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
data files: json file that entities has string data on vectors
Steps:
1. create collection
2. import data
3. verify import failed with errors
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
float_vector=float_vector,
data_fields=default_multi_fields,
err_type=DataErrorType.str_on_vector_field,
wrong_position=entities // 2,
scalars=default_multi_fields,
force=True,
)
self._connect()
@ -2410,7 +2344,69 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=is_row_based, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = "illegal numeric value"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
assert self.collection_wrap.num_entities == 0
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("float_vector", [True, False])
@pytest.mark.parametrize("dim", [8])
@pytest.mark.parametrize("entities", [500])
def test_data_type_str_on_vector_fields(
self, is_row_based, auto_id, float_vector, dim, entities
):
"""
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
data files: json file that entities has string data on vectors
Steps:
1. create collection
2. import data
3. verify import failed with errors
"""
wrong_position = entities // 2
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
float_vector=float_vector,
data_fields=default_multi_fields,
err_type=DataErrorType.str_on_vector_field,
wrong_position=wrong_position,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [cf.gen_float_vec_field(name=df.vec_field, dim=dim)]
if not float_vector:
fields = [cf.gen_binary_vec_field(name=df.vec_field, dim=dim)]
fields.extend([
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_int32_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
])
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
@ -2420,7 +2416,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert not success
failed_reason = "illegal numeric value"
if not float_vector:
failed_reason = f"doesn't equal to vector dimension {dim} of field vectors"
failed_reason = f"the field '{df.vec_field}' value at the row {wrong_position} is invalid"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2486,7 +2482,7 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert):
break
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=False, files=files
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(

View File

@ -73,7 +73,7 @@ class TestcaseBaseBulkInsert(TestcaseBase):
class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8, 128
@pytest.mark.parametrize("entities", [100]) # 100, 1000
@ -114,7 +114,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
# is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
@ -175,7 +175,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8, 128
@pytest.mark.parametrize("entities", [100]) # 100, 1000