mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 09:38:39 +08:00
test: enable import job test in ci (#31530)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
78fbb87b3a
commit
c42492c0fd
@ -12,4 +12,5 @@ markers =
|
|||||||
L0 : 'L0 case, high priority'
|
L0 : 'L0 case, high priority'
|
||||||
L1 : 'L1 case, second priority'
|
L1 : 'L1 case, second priority'
|
||||||
L2 : 'L2 case, system level case'
|
L2 : 'L2 case, system level case'
|
||||||
|
BulkInsert : 'Bulk Insert case'
|
||||||
|
|
||||||
|
|||||||
@ -14,7 +14,7 @@ from base.testbase import TestBase
|
|||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.L1
|
@pytest.mark.BulkInsert
|
||||||
class TestCreateImportJob(TestBase):
|
class TestCreateImportJob(TestBase):
|
||||||
|
|
||||||
@pytest.mark.parametrize("insert_num", [5000])
|
@pytest.mark.parametrize("insert_num", [5000])
|
||||||
@ -89,8 +89,9 @@ class TestCreateImportJob(TestBase):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
if time.time() - t0 > 360:
|
if time.time() - t0 > 360:
|
||||||
assert False, "import job timeout"
|
assert False, "import job timeout"
|
||||||
time.sleep(10)
|
|
||||||
c = Collection(name)
|
c = Collection(name)
|
||||||
|
c.load(_refresh=True)
|
||||||
|
time.sleep(10)
|
||||||
res = c.query(
|
res = c.query(
|
||||||
expr="",
|
expr="",
|
||||||
output_fields=["count(*)"],
|
output_fields=["count(*)"],
|
||||||
@ -146,7 +147,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
tmp.update({f"dynamic_field_{i}": i})
|
tmp.update({f"dynamic_field_{i}": i})
|
||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -179,8 +180,9 @@ class TestCreateImportJob(TestBase):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
if time.time() - t0 > 120:
|
if time.time() - t0 > 120:
|
||||||
assert False, "import job timeout"
|
assert False, "import job timeout"
|
||||||
time.sleep(10)
|
|
||||||
c = Collection(name)
|
c = Collection(name)
|
||||||
|
c.load(_refresh=True)
|
||||||
|
time.sleep(10)
|
||||||
res = c.query(
|
res = c.query(
|
||||||
expr="",
|
expr="",
|
||||||
output_fields=["count(*)"],
|
output_fields=["count(*)"],
|
||||||
@ -235,7 +237,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
tmp.update({f"dynamic_field_{i}": i})
|
tmp.update({f"dynamic_field_{i}": i})
|
||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -271,8 +273,9 @@ class TestCreateImportJob(TestBase):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
if time.time() - t0 > 120:
|
if time.time() - t0 > 120:
|
||||||
assert False, "import job timeout"
|
assert False, "import job timeout"
|
||||||
time.sleep(10)
|
|
||||||
c = Collection(name)
|
c = Collection(name)
|
||||||
|
c.load(_refresh=True)
|
||||||
|
time.sleep(10)
|
||||||
res = c.query(
|
res = c.query(
|
||||||
expr="",
|
expr="",
|
||||||
output_fields=["count(*)"],
|
output_fields=["count(*)"],
|
||||||
@ -295,8 +298,6 @@ class TestCreateImportJob(TestBase):
|
|||||||
rsp = self.vector_client.vector_query(payload)
|
rsp = self.vector_client.vector_query(payload)
|
||||||
assert rsp["code"] == 200
|
assert rsp["code"] == 200
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_job_import_multi_json_file(self):
|
def test_job_import_multi_json_file(self):
|
||||||
# create collection
|
# create collection
|
||||||
name = gen_collection_name()
|
name = gen_collection_name()
|
||||||
@ -327,7 +328,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
for i in range(1000*file_num, 1000*(file_num+1))]
|
for i in range(1000*file_num, 1000*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.json"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -407,7 +408,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
for i in range(1000*file_num, 1000*(file_num+1))]
|
for i in range(1000*file_num, 1000*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.parquet"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.parquet"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -488,7 +489,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
|
|
||||||
file_list = []
|
file_list = []
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_dir = f"bulk_insert_data_{file_num}"
|
file_dir = f"bulk_insert_data_{file_num}_{uuid4()}"
|
||||||
base_file_path = f"/tmp/{file_dir}"
|
base_file_path = f"/tmp/{file_dir}"
|
||||||
df = pd.DataFrame(data)
|
df = pd.DataFrame(data)
|
||||||
# each column is a list and convert to a npy file
|
# each column is a list and convert to a npy file
|
||||||
@ -575,7 +576,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
|
|
||||||
file_list = []
|
file_list = []
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_dir = f"bulk_insert_data_{file_num}"
|
file_dir = f"bulk_insert_data_{file_num}_{uuid4()}"
|
||||||
base_file_path = f"/tmp/{file_dir}"
|
base_file_path = f"/tmp/{file_dir}"
|
||||||
df = pd.DataFrame(data)
|
df = pd.DataFrame(data)
|
||||||
# each column is a list and convert to a npy file
|
# each column is a list and convert to a npy file
|
||||||
@ -599,7 +600,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
for i in range(1000*file_num, 1000*(file_num+1))]
|
for i in range(1000*file_num, 1000*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.parquet"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.parquet"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -618,7 +619,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
for i in range(1000*file_num, 1000*(file_num+1))]
|
for i in range(1000*file_num, 1000*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.json"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -668,7 +669,7 @@ class TestCreateImportJob(TestBase):
|
|||||||
|
|
||||||
@pytest.mark.parametrize("insert_round", [2])
|
@pytest.mark.parametrize("insert_round", [2])
|
||||||
@pytest.mark.parametrize("auto_id", [True])
|
@pytest.mark.parametrize("auto_id", [True])
|
||||||
@pytest.mark.parametrize("is_partition_key", [False])
|
@pytest.mark.parametrize("is_partition_key", [True])
|
||||||
@pytest.mark.parametrize("enable_dynamic_schema", [True])
|
@pytest.mark.parametrize("enable_dynamic_schema", [True])
|
||||||
@pytest.mark.parametrize("nb", [3000])
|
@pytest.mark.parametrize("nb", [3000])
|
||||||
@pytest.mark.parametrize("dim", [128])
|
@pytest.mark.parametrize("dim", [128])
|
||||||
@ -847,7 +848,7 @@ class TestImportJobAdvance(TestBase):
|
|||||||
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.json"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -947,7 +948,7 @@ class TestCreateImportJobAdvance(TestBase):
|
|||||||
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.json"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -1036,7 +1037,7 @@ class TestCreateImportJobAdvance(TestBase):
|
|||||||
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
for i in range(batch_size*file_num, batch_size*(file_num+1))]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{file_num}.json"
|
file_name = f"bulk_insert_data_{file_num}_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
# create dir for file path
|
# create dir for file path
|
||||||
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
@ -1209,7 +1210,7 @@ class TestCreateImportJobNegative(TestBase):
|
|||||||
for i in range(10000)]
|
for i in range(10000)]
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = "bulk_insert_data.txt"
|
file_name = f"bulk_insert_data_{uuid4()}.txt"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
|
|
||||||
json_data = json.dumps(data)
|
json_data = json.dumps(data)
|
||||||
@ -1331,7 +1332,7 @@ class TestCreateImportJobNegative(TestBase):
|
|||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
|
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -1388,7 +1389,7 @@ class TestCreateImportJobNegative(TestBase):
|
|||||||
tmp.update({f"dynamic_field_{i}": i})
|
tmp.update({f"dynamic_field_{i}": i})
|
||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -1421,8 +1422,9 @@ class TestCreateImportJobNegative(TestBase):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
if time.time() - t0 > 120:
|
if time.time() - t0 > 120:
|
||||||
assert False, "import job timeout"
|
assert False, "import job timeout"
|
||||||
time.sleep(10)
|
|
||||||
c = Collection(name)
|
c = Collection(name)
|
||||||
|
c.load(_refresh=True)
|
||||||
|
time.sleep(10)
|
||||||
res = c.query(
|
res = c.query(
|
||||||
expr="",
|
expr="",
|
||||||
output_fields=["count(*)"],
|
output_fields=["count(*)"],
|
||||||
@ -1485,7 +1487,7 @@ class TestListImportJob(TestBase):
|
|||||||
tmp.update({f"dynamic_field_{i}": i})
|
tmp.update({f"dynamic_field_{i}": i})
|
||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
@ -1558,7 +1560,7 @@ class TestGetImportJobProgress(TestBase):
|
|||||||
tmp.update({f"dynamic_field_{i}": i})
|
tmp.update({f"dynamic_field_{i}": i})
|
||||||
data.append(tmp)
|
data.append(tmp)
|
||||||
# dump data to file
|
# dump data to file
|
||||||
file_name = f"bulk_insert_data_{int(time.time())}.json"
|
file_name = f"bulk_insert_data_{uuid4()}.json"
|
||||||
file_path = f"/tmp/{file_name}"
|
file_path = f"/tmp/{file_name}"
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
json.dump(data, f)
|
json.dump(data, f)
|
||||||
|
|||||||
@ -84,13 +84,26 @@ cd ${ROOT}/tests/restful_client_v2
|
|||||||
|
|
||||||
if [[ -n "${TEST_TIMEOUT:-}" ]]; then
|
if [[ -n "${TEST_TIMEOUT:-}" ]]; then
|
||||||
|
|
||||||
timeout "${TEST_TIMEOUT}" pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} -v -x -m L0 -n 6 --timeout 180\
|
timeout "${TEST_TIMEOUT}" pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} -v -x -m L0 -n 6 --timeout 180\
|
||||||
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
||||||
else
|
else
|
||||||
pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} -v -x -m L0 -n 6 --timeout 180\
|
pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} -v -x -m L0 -n 6 --timeout 180\
|
||||||
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
if [[ "${MILVUS_HELM_RELEASE_NAME}" != *"msop"* ]]; then
|
||||||
|
if [[ -n "${TEST_TIMEOUT:-}" ]]; then
|
||||||
|
|
||||||
|
timeout "${TEST_TIMEOUT}" pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} -v -x -m BulkInsert -n 6 --timeout 180\
|
||||||
|
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
||||||
|
else
|
||||||
|
pytest testcases --endpoint http://${MILVUS_SERVICE_NAME}:${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} -v -x -m BulkInsert -n 6 --timeout 180\
|
||||||
|
--html=${CI_LOG_PATH}/report_restful.html --self-contained-html
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
cd ${ROOT}/tests/python_client
|
cd ${ROOT}/tests/python_client
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user