test: add negative import testcase (#32163)

add negative import testcase

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-04-17 10:59:31 +08:00 committed by GitHub
parent d1ef0a81ee
commit 3e2314ebee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,6 +15,7 @@ from uuid import uuid4
IMPORT_TIMEOUT = 360
@pytest.mark.BulkInsert
class TestCreateImportJob(TestBase):
@ -1099,6 +1100,85 @@ class TestCreateImportJobAdvance(TestBase):
@pytest.mark.L1
class TestCreateImportJobNegative(TestBase):
@pytest.mark.parametrize("insert_num", [2])
@pytest.mark.parametrize("import_task_num", [1])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_field", [True])
@pytest.mark.BulkInsert
def test_create_import_job_with_json_dup_dynamic_key(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field):
# create collection
name = gen_collection_name()
dim = 16
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_field,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}
]
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
# upload file to storage
data = []
for i in range(insert_num):
tmp = {
"word_count": i,
"book_describe": f"book_{i}",
"dynamic_key": i,
"book_intro": [random.random() for _ in range(dim)]
}
if not auto_id:
tmp["book_id"] = i
if enable_dynamic_field:
tmp.update({f"$meta": {"dynamic_key": i+1}})
data.append(tmp)
# dump data to file
file_name = f"bulk_insert_data_{uuid4()}.json"
file_path = f"/tmp/{file_name}"
logger.info(f"data: {data}")
with open(file_path, "w") as f:
json.dump(data, f)
# upload file to minio storage
self.storage_client.upload_file(file_path, file_name)
# create import job
payload = {
"collectionName": name,
"files": [[file_name]],
}
for i in range(import_task_num):
rsp = self.import_job_client.create_import_jobs(payload)
# list import job
payload = {
"collectionName": name,
}
rsp = self.import_job_client.list_import_jobs(payload)
# get import job progress
for task in rsp['data']["records"]:
task_id = task['jobId']
finished = False
t0 = time.time()
while not finished:
rsp = self.import_job_client.get_import_job_progress(task_id)
if rsp['data']['state'] == "Failed":
assert True
finished = True
if rsp['data']['state'] == "Completed":
assert False
time.sleep(5)
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
def test_import_job_with_empty_files(self):
# create collection
name = gen_collection_name()