Add query for bulk load verification (#17259)

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
yanliang567 2022-05-30 10:58:02 +08:00 committed by GitHub
parent 7f7e710b55
commit d3fbbe9b70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -28,31 +28,8 @@ def entity_suffix(entities):
return suffix
def gen_file_prefix(row_based=True, auto_id=True, prefix=""):
if row_based:
if auto_id:
return f"{prefix}row_auto"
else:
return f"{prefix}row_cust"
else:
if auto_id:
return f"{prefix}col_auto"
else:
return f"{prefix}col_cust"
class TestBulkLoad(TestcaseBase):
def setup_class(self):
log.info("[setup_import] Start setup class...")
# TODO: copy data files to minio
log.info("copy data files to minio")
def teardown_class(self):
log.info("[teardown_import] Start teardown class...")
# TODO: clean up data or not is a question
log.info("clean up data files in minio")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@pytest.mark.parametrize("auto_id", [True, False])
@ -100,14 +77,19 @@ class TestBulkLoad(TestcaseBase):
# verify imported data is available for search
self.collection_wrap.load()
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
res, _ = self.collection_wrap.search(search_data, df.vec_field,
param=search_params, limit=1,
param=search_params, limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
check_items={"nq": nq,
"limit": topk})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -154,14 +136,21 @@ class TestBulkLoad(TestcaseBase):
# verify imported data is available for search
self.collection_wrap.load()
log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
nq = 3
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
res, _ = self.collection_wrap.search(search_data, df.vec_field,
param=search_params, limit=1,
param=search_params, limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
check_items={"nq": nq,
"limit": topk})
for hits in res:
ids = hits.ids
expr = f"{df.pk_field} in {ids}"
expr = expr.replace("'", "\"")
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -223,14 +212,19 @@ class TestBulkLoad(TestcaseBase):
assert res == exp_res
log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
nq = 10
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
res, _ = self.collection_wrap.search(search_data, df.vec_field,
param=search_params, limit=1,
param=search_params, limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
check_items={"nq": nq,
"limit": topk})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -302,6 +296,10 @@ class TestBulkLoad(TestcaseBase):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -384,8 +382,10 @@ class TestBulkLoad(TestcaseBase):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
# build index
index_params = {"index_type": "HNSW", "params": {"M": 8, "efConstruction": 100}, "metric_type": "IP"}
@ -406,6 +406,10 @@ class TestBulkLoad(TestcaseBase):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -488,8 +492,12 @@ class TestBulkLoad(TestcaseBase):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
for hits in res:
ids = hits.ids
expr = f"{df.pk_field} in {ids}"
expr = expr.replace("'", "\"")
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
# build index
index_params = {"index_type": "HNSW", "params": {"M": 8, "efConstruction": 100}, "metric_type": "IP"}
@ -510,6 +518,12 @@ class TestBulkLoad(TestcaseBase):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
for hits in res:
ids = hits.ids
expr = f"{df.pk_field} in {ids}"
expr = expr.replace("'", "\"")
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False]) # True, False
@ -583,15 +597,19 @@ class TestBulkLoad(TestcaseBase):
# assert res == exp_res
# verify search and query
search_data = cf.gen_vectors(1, dim)
nq = 5
topk = 1
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(search_data, df.vec_field,
param=search_params, limit=1,
param=search_params, limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
check_items={"nq": nq,
"limit": topk})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])
@ -674,14 +692,19 @@ class TestBulkLoad(TestcaseBase):
# verify imported data is available for search
self.collection_wrap.load()
log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
nq = 2
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
res, _ = self.collection_wrap.search(search_data, df.vec_field,
param=search_params, limit=1,
param=search_params, limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"limit": 1})
# self.collection_wrap.query(expr=f"id in {ids}")
check_items={"nq": nq,
"limit": topk})
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("row_based", [True, False])