diff --git a/tests/restful_client_v2/testcases/test_jobs_operation.py b/tests/restful_client_v2/testcases/test_jobs_operation.py index f51f07ab26..052ed83ba1 100644 --- a/tests/restful_client_v2/testcases/test_jobs_operation.py +++ b/tests/restful_client_v2/testcases/test_jobs_operation.py @@ -115,6 +115,174 @@ class TestCreateImportJob(TestBase): rsp = self.vector_client.vector_query(payload) assert rsp['code'] == 0 + + @pytest.mark.parametrize("insert_num", [3000]) + @pytest.mark.parametrize("import_task_num", [2]) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("is_partition_key", [True]) + def test_import_job_with_coo_format(self, insert_num, import_task_num, auto_id, is_partition_key): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": False, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "SparseFloatVector"} + ] + }, + "indexParams": [ {"fieldName": "book_intro", "indexName": "sparse_float_vector_index", "metricType": "IP", + "params": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}},] + } + self.collection_client.collection_create(payload) + self.wait_load_completed(name) + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}", + "book_intro": {"values": [random.random() for i in range(dim)], "indices": [i**2 for i in range(dim)]} + } + if not auto_id: + tmp["book_id"] = i + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{uuid4()}.parquet" + file_path = f"/tmp/{file_name}" + df = pd.DataFrame(data) + logger.info(df) + df.to_parquet(file_path, engine="pyarrow") + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for task in rsp['data']["records"]: + task_id = task['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(task_id) + if rsp['data']['state'] == "Completed": + finished = True + time.sleep(5) + if time.time() - t0 > IMPORT_TIMEOUT: + assert False, "import job timeout" + c = Collection(name) + c.load(_refresh=True, timeou=120) + res = c.query( + expr="", + output_fields=["count(*)"], + ) + assert res[0]["count(*)"] == insert_num * import_task_num + # query data + payload = { + "collectionName": name, + "filter": "book_id > 0", + "outputFields": ["*"], + } + rsp = self.vector_client.vector_query(payload) + assert rsp['code'] == 0 + + + + @pytest.mark.parametrize("insert_num", [3000]) + @pytest.mark.parametrize("import_task_num", [2]) + @pytest.mark.parametrize("auto_id", [True,]) + @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("enable_dynamic_field", [True]) + def test_import_with_longer_text_than_max_length(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field): + # create collection + name = gen_collection_name() + dim = 128 + payload = { + "collectionName": name, + "schema": { + "autoId": auto_id, + "enableDynamicField": enable_dynamic_field, + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}}, + {"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}}, + {"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}} + ] + }, + "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] + } + self.collection_client.collection_create(payload) + self.wait_load_completed(name) + # upload file to storage + data = [] + for i in range(insert_num): + tmp = { + "word_count": i, + "book_describe": f"book_{i}" * 256, + "book_intro": [np.float32(random.random()) for _ in range(dim)] + } + if not auto_id: + tmp["book_id"] = i + if enable_dynamic_field: + tmp.update({f"dynamic_field_{i}": i}) + data.append(tmp) + # dump data to file + file_name = f"bulk_insert_data_{uuid4()}.json" + file_path = f"/tmp/{file_name}" + with open(file_path, "w") as f: + json.dump(data, f, cls=NumpyEncoder) + # upload file to minio storage + self.storage_client.upload_file(file_path, file_name) + + # create import job + payload = { + "collectionName": name, + "files": [[file_name]], + } + for i in range(import_task_num): + rsp = self.import_job_client.create_import_jobs(payload) + # list import job + payload = { + "collectionName": name, + } + rsp = self.import_job_client.list_import_jobs(payload) + + # get import job progress + for task in rsp['data']["records"]: + task_id = task['jobId'] + finished = False + t0 = time.time() + + while not finished: + rsp = self.import_job_client.get_import_job_progress(task_id) + if rsp['data']['state'] == "Completed": + assert False, "import job should not be completed" + if rsp['data']['state'] == "Failed": + assert True + finished = True + logger.debug(f"job progress: {rsp}") + time.sleep(5) + if time.time() - t0 > IMPORT_TIMEOUT: + assert False, "import job timeout" + + @pytest.mark.parametrize("insert_num", [5000]) @pytest.mark.parametrize("import_task_num", [1]) @pytest.mark.parametrize("auto_id", [True])