mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
test: Add a test for iterator enhancement (#37296)
related issue: https://github.com/milvus-io/milvus/issues/37084 Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
This commit is contained in:
parent
43ad9af529
commit
3a3404658e
@ -27,8 +27,8 @@ pytest-parallel
|
||||
pytest-random-order
|
||||
|
||||
# pymilvus
|
||||
pymilvus==2.5.0rc104
|
||||
pymilvus[bulk_writer]==2.5.0rc104
|
||||
pymilvus==2.5.0rc106
|
||||
pymilvus[bulk_writer]==2.5.0rc106
|
||||
|
||||
# for customize config test
|
||||
python-benedict==0.24.3
|
||||
|
||||
@ -416,7 +416,6 @@ class TestAliasOperationInvalid(TestcaseBase):
|
||||
check_items={exp_name: c_name, exp_schema: schema})
|
||||
alias_name = cf.gen_unique_str(prefix)
|
||||
self.utility_wrap.create_alias(collection_w.name, alias_name)
|
||||
# collection_w.create_alias(alias_name)
|
||||
collection_alias = self.init_collection_wrap(name=alias_name, schema=schema,
|
||||
check_task=CheckTasks.check_collection_property,
|
||||
check_items={exp_name: alias_name,
|
||||
@ -427,7 +426,6 @@ class TestAliasOperationInvalid(TestcaseBase):
|
||||
collection_alias.drop(check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.xfail(reason="issue #36963")
|
||||
def test_alias_reuse_alias_name_from_dropped_collection(self):
|
||||
"""
|
||||
target: test dropping a collection which has a alias
|
||||
@ -457,19 +455,23 @@ class TestAliasOperationInvalid(TestcaseBase):
|
||||
res2 = self.utility_wrap.list_aliases(c_name)[0]
|
||||
assert len(res2) == 0
|
||||
# the same alias name can be reused for another collection
|
||||
self.utility_wrap.create_alias(c_name, alias_name)
|
||||
res2 = self.utility_wrap.list_aliases(c_name)[0]
|
||||
assert len(res2) == 1
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"{alias_name} is alias to another collection: {collection_w.name}: alias already exist"}
|
||||
self.utility_wrap.create_alias(c_name, alias_name,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
# res2 = self.utility_wrap.list_aliases(c_name)[0]
|
||||
# assert len(res2) == 1
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.xfail(reason="issue #36963")
|
||||
def test_alias_rename_collection_to_alias_name(self):
|
||||
"""
|
||||
target: test renaming a collection to a alias name
|
||||
method:
|
||||
1.create a collection
|
||||
2.create an alias for the collection
|
||||
3.rename the collection to the alias name
|
||||
3.rename the collection to the alias name no matter the collection was dropped or not
|
||||
expected: in step 3, rename collection to alias name failed
|
||||
"""
|
||||
self._connect()
|
||||
c_name = cf.gen_unique_str("collection")
|
||||
@ -479,7 +481,7 @@ class TestAliasOperationInvalid(TestcaseBase):
|
||||
alias_name = cf.gen_unique_str(prefix)
|
||||
self.utility_wrap.create_alias(collection_w.name, alias_name)
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"duplicated new collection name default:{alias_name} with other collection name or alias"}
|
||||
ct.err_msg: f"cannot rename collection to an existing alias: {alias_name}"}
|
||||
self.utility_wrap.rename_collection(collection_w.name, alias_name,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@ -487,7 +489,5 @@ class TestAliasOperationInvalid(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
|
||||
check_task=CheckTasks.check_collection_property,
|
||||
check_items={exp_name: c_name, exp_schema: default_schema})
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"this is not expected, any collection name or alias name shall be unique"}
|
||||
self.utility_wrap.rename_collection(collection_w.name, alias_name,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@ -2392,12 +2392,12 @@ class TestGroupSearch(TestCaseClassBase):
|
||||
all_pages_grpby_field_values = []
|
||||
for r in range(page_rounds):
|
||||
page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit, offset=limit * r,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit},
|
||||
)[0]
|
||||
param=search_param, limit=limit, offset=limit * r,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit},
|
||||
)[0]
|
||||
for j in range(limit):
|
||||
all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field))
|
||||
all_pages_ids += page_res[0].ids
|
||||
@ -2405,12 +2405,12 @@ class TestGroupSearch(TestCaseClassBase):
|
||||
assert hit_rate >= 0.8
|
||||
|
||||
total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit * page_rounds,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit * page_rounds}
|
||||
)[0]
|
||||
param=search_param, limit=limit * page_rounds,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit * page_rounds}
|
||||
)[0]
|
||||
hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids)))
|
||||
hit_rate = round(hit_num / (limit * page_rounds), 3)
|
||||
assert hit_rate >= 0.8
|
||||
@ -2473,3 +2473,46 @@ class TestGroupSearch(TestCaseClassBase):
|
||||
grpby_field_values.append(total_res[0][i].fields.get(grpby_field))
|
||||
assert len(grpby_field_values) == total_count
|
||||
assert len(set(grpby_field_values)) == limit * page_rounds
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_search_group_size_min_max(self):
|
||||
"""
|
||||
verify search group by works with min and max group size
|
||||
"""
|
||||
group_by_field = self.inverted_string_field
|
||||
default_search_field = self.vector_fields[1]
|
||||
search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1])
|
||||
search_params = {}
|
||||
limit = 10
|
||||
max_group_size = 10
|
||||
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=max_group_size, group_strict_size=True,
|
||||
output_fields=[group_by_field])
|
||||
exceed_max_group_size = max_group_size + 1
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"input group size:{exceed_max_group_size} exceeds configured max "
|
||||
f"group size:{max_group_size}"}
|
||||
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=exceed_max_group_size, group_strict_size=True,
|
||||
output_fields=[group_by_field],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
min_group_size = 1
|
||||
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=max_group_size, group_strict_size=True,
|
||||
output_fields=[group_by_field])
|
||||
below_min_group_size = min_group_size - 1
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"input group size:{below_min_group_size} is negative"}
|
||||
self.collection_wrap.search(data=search_vectors, anns_field=default_search_field,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=below_min_group_size, group_strict_size=True,
|
||||
output_fields=[group_by_field],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
@ -25,7 +25,7 @@ class TestQueryIterator(TestcaseBase):
|
||||
3. query iterator with checkpoint file
|
||||
4. iterator.next() for 10 times
|
||||
5. delete some entities before calling a new query iterator
|
||||
6. call a new query iterator with the same checkpoint file
|
||||
6. call a new query iterator with the same checkpoint file, with diff batch_size and output_fields
|
||||
7. iterator.next() until the end
|
||||
verify:
|
||||
1. no pk lost in interator results for the 2 iterators
|
||||
@ -59,16 +59,14 @@ class TestQueryIterator(TestcaseBase):
|
||||
iterator.close()
|
||||
assert False, f"The iterator ends before {first_iter_times} times iterators: iter_times: {iter_times}"
|
||||
break
|
||||
pk_name = ct.default_int64_field_name if res[0].get(ct.default_int64_field_name, None) is not None \
|
||||
else ct.default_string_field_name
|
||||
for i in range(len(res)):
|
||||
pk_list1.append(res[i][pk_name])
|
||||
pk_list1.append(res[i][primary_field])
|
||||
file_exist = os.path.isfile(iterator_cp_file)
|
||||
assert file_exist is True, "The checkpoint file exists without iterator close"
|
||||
|
||||
# 4. try to delete and insert some entities before calling a new query iterator
|
||||
delete_ids = random.sample(insert_ids[:nb//2], 101) + random.sample(insert_ids[nb//2:], 101)
|
||||
del_res, _ = collection_w.delete(expr=f"{pk_name} in {delete_ids}")
|
||||
del_res, _ = collection_w.delete(expr=f"{primary_field} in {delete_ids}")
|
||||
assert del_res.delete_count == len(delete_ids)
|
||||
|
||||
data = cf.gen_default_list_data(nb=333, start=nb)
|
||||
@ -77,18 +75,16 @@ class TestQueryIterator(TestcaseBase):
|
||||
collection_w.flush()
|
||||
|
||||
# 5. call a new query iterator with the same checkpoint file to continue the first iterator
|
||||
iterator2 = collection_w.query_iterator(batch_size, expr=expr, iterator_cp_file=iterator_cp_file)[0]
|
||||
pk_list2 = []
|
||||
iterator2 = collection_w.query_iterator(batch_size*2, expr=expr,
|
||||
output_fields=[primary_field, ct.default_float_field_name],
|
||||
iterator_cp_file=iterator_cp_file)[0]
|
||||
while True:
|
||||
res = iterator2.next()
|
||||
if len(res) == 0:
|
||||
iterator2.close()
|
||||
break
|
||||
pk_name = ct.default_int64_field_name if res[0].get(ct.default_int64_field_name, None) is not None \
|
||||
else ct.default_string_field_name
|
||||
for i in range(len(res)):
|
||||
pk_list2.append(res[i][pk_name])
|
||||
pk_list1.append(res[i][pk_name])
|
||||
pk_list1.append(res[i][primary_field])
|
||||
# 6. verify
|
||||
assert len(pk_list1) == len(set(pk_list1)) == nb
|
||||
file_exist = os.path.isfile(iterator_cp_file)
|
||||
@ -299,3 +295,41 @@ class TestQueryIterator(TestcaseBase):
|
||||
collection_w.query_iterator(check_task=CheckTasks.check_query_iterator,
|
||||
check_items={"count": nb,
|
||||
"batch_size": ct.default_batch_size})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.skip("issue #37109, need debug due to the resolution of the issue")
|
||||
def test_query_iterator_on_two_collections(self):
|
||||
"""
|
||||
target: test query iterator on two collections
|
||||
method: 1. create two collections
|
||||
2. query iterator on the first collection
|
||||
3. check the result, expect pk
|
||||
expected: query successfully
|
||||
"""
|
||||
# 1. initialize with data
|
||||
collection_w = self.init_collection_general(prefix, True)[0]
|
||||
collection_w2 = self.init_collection_general(prefix, False, primary_field=ct.default_string_field_name)[0]
|
||||
|
||||
data = cf.gen_default_list_data(nb=ct.default_nb, primary_field=ct.default_string_field_name)
|
||||
string_values = [cf.gen_str_by_length(20) for _ in range(ct.default_nb)]
|
||||
data[2] = string_values
|
||||
collection_w2.insert(data)
|
||||
|
||||
# 2. call a new query iterator and iterator for some times
|
||||
batch_size = 150
|
||||
iterator_cp_file = f"/tmp/it_{collection_w.name}_cp"
|
||||
iterator2 = collection_w2.query_iterator(batch_size=batch_size // 2, iterator_cp_file=iterator_cp_file)[0]
|
||||
iter_times = 0
|
||||
first_iter_times = ct.default_nb // batch_size // 2 // 2 # only iterate half of the data for the 1st time
|
||||
while iter_times < first_iter_times:
|
||||
iter_times += 1
|
||||
res = iterator2.next()
|
||||
if len(res) == 0:
|
||||
iterator2.close()
|
||||
assert False, f"The iterator ends before {first_iter_times} times iterators: iter_times: {iter_times}"
|
||||
break
|
||||
|
||||
# 3. query iterator on the second collection with the same checkpoint file
|
||||
|
||||
iterator = collection_w.query_iterator(batch_size=batch_size, iterator_cp_file=iterator_cp_file)[0]
|
||||
print(iterator.next())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user