[test]Fix bulk insert chaos test (#20341)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2022-11-08 16:37:05 +08:00 committed by GitHub
parent 57c9e5b0bc
commit 1a4c0fa2e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 7 deletions

View File

@ -580,7 +580,7 @@ class BulkInsertChecker(Checker):
def __init__(self, collection_name=None, files=[]):
if collection_name is None:
collection_name = cf.gen_unique_str("BulkLoadChecker_")
collection_name = cf.gen_unique_str("BulkInsertChecker_")
super().__init__(collection_name=collection_name)
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
@ -597,9 +597,9 @@ class BulkInsertChecker(Checker):
@trace()
def bulk_insert(self):
task_ids, result = self.utility_wrap.bulk_insert(collection_name=self.c_name,
task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=task_ids, timeout=60)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=60)
return task_ids, completed
@exception_handler()

View File

@ -119,15 +119,16 @@ class TestChaos(TestChaosBase):
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "/tmp/ci_logs/bulk_insert_data_source.json"
files = [file_name]
data_source = "/tmp/ci_logs/bulk_insert_data_source.json"
file_name = "bulk_insert_data_source.json"
files = ["bulk_insert_data_source.json"]
#TODO: npy file type is not supported so far
log.info("generate bulk load file")
with open(file_name, "w") as f:
with open(data_source, "w") as f:
f.write(json.dumps(data_dict, indent=4))
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
client.fput_object(bucket_name, file_name, data_source)
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files)
log.info("prepare data for bulk load done")