test: add coo format for sparse vector import and some negative case (#41040)

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-04-02 19:54:26 +08:00 committed by GitHub
parent f552ec67dd
commit eb4884b5e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -115,6 +115,174 @@ class TestCreateImportJob(TestBase):
rsp = self.vector_client.vector_query(payload)
assert rsp['code'] == 0
@pytest.mark.parametrize("insert_num", [3000])
@pytest.mark.parametrize("import_task_num", [2])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
def test_import_job_with_coo_format(self, insert_num, import_task_num, auto_id, is_partition_key):
# create collection
name = gen_collection_name()
dim = 128
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": False,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "book_intro", "dataType": "SparseFloatVector"}
]
},
"indexParams": [ {"fieldName": "book_intro", "indexName": "sparse_float_vector_index", "metricType": "IP",
"params": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}},]
}
self.collection_client.collection_create(payload)
self.wait_load_completed(name)
# upload file to storage
data = []
for i in range(insert_num):
tmp = {
"word_count": i,
"book_describe": f"book_{i}",
"book_intro": {"values": [random.random() for i in range(dim)], "indices": [i**2 for i in range(dim)]}
}
if not auto_id:
tmp["book_id"] = i
data.append(tmp)
# dump data to file
file_name = f"bulk_insert_data_{uuid4()}.parquet"
file_path = f"/tmp/{file_name}"
df = pd.DataFrame(data)
logger.info(df)
df.to_parquet(file_path, engine="pyarrow")
# upload file to minio storage
self.storage_client.upload_file(file_path, file_name)
# create import job
payload = {
"collectionName": name,
"files": [[file_name]],
}
for i in range(import_task_num):
rsp = self.import_job_client.create_import_jobs(payload)
# list import job
payload = {
"collectionName": name,
}
rsp = self.import_job_client.list_import_jobs(payload)
# get import job progress
for task in rsp['data']["records"]:
task_id = task['jobId']
finished = False
t0 = time.time()
while not finished:
rsp = self.import_job_client.get_import_job_progress(task_id)
if rsp['data']['state'] == "Completed":
finished = True
time.sleep(5)
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
c = Collection(name)
c.load(_refresh=True, timeou=120)
res = c.query(
expr="",
output_fields=["count(*)"],
)
assert res[0]["count(*)"] == insert_num * import_task_num
# query data
payload = {
"collectionName": name,
"filter": "book_id > 0",
"outputFields": ["*"],
}
rsp = self.vector_client.vector_query(payload)
assert rsp['code'] == 0
@pytest.mark.parametrize("insert_num", [3000])
@pytest.mark.parametrize("import_task_num", [2])
@pytest.mark.parametrize("auto_id", [True,])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_field", [True])
def test_import_with_longer_text_than_max_length(self, insert_num, import_task_num, auto_id, is_partition_key, enable_dynamic_field):
# create collection
name = gen_collection_name()
dim = 128
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_field,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": is_partition_key, "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "book_intro", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}}
]
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
self.collection_client.collection_create(payload)
self.wait_load_completed(name)
# upload file to storage
data = []
for i in range(insert_num):
tmp = {
"word_count": i,
"book_describe": f"book_{i}" * 256,
"book_intro": [np.float32(random.random()) for _ in range(dim)]
}
if not auto_id:
tmp["book_id"] = i
if enable_dynamic_field:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
# dump data to file
file_name = f"bulk_insert_data_{uuid4()}.json"
file_path = f"/tmp/{file_name}"
with open(file_path, "w") as f:
json.dump(data, f, cls=NumpyEncoder)
# upload file to minio storage
self.storage_client.upload_file(file_path, file_name)
# create import job
payload = {
"collectionName": name,
"files": [[file_name]],
}
for i in range(import_task_num):
rsp = self.import_job_client.create_import_jobs(payload)
# list import job
payload = {
"collectionName": name,
}
rsp = self.import_job_client.list_import_jobs(payload)
# get import job progress
for task in rsp['data']["records"]:
task_id = task['jobId']
finished = False
t0 = time.time()
while not finished:
rsp = self.import_job_client.get_import_job_progress(task_id)
if rsp['data']['state'] == "Completed":
assert False, "import job should not be completed"
if rsp['data']['state'] == "Failed":
assert True
finished = True
logger.debug(f"job progress: {rsp}")
time.sleep(5)
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
@pytest.mark.parametrize("insert_num", [5000])
@pytest.mark.parametrize("import_task_num", [1])
@pytest.mark.parametrize("auto_id", [True])