mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
test: add e2e case to cover partial update and upsert extending TTL deadline (#47171)
Issue: #47159 add test case to cover partial update and upsert to extend ttl deadline On branch feature/partial-update Changes to be committed: modified: milvus_client_v2/test_milvus_client_ttl.py --------- Signed-off-by: Eric Hou <eric.hou@zilliz.com> Co-authored-by: Eric Hou <eric.hou@zilliz.com>
This commit is contained in:
parent
7aa115c7b7
commit
1c080484e9
@ -9,6 +9,12 @@ from base.client_v2_base import TestMilvusClientV2Base
|
||||
from pymilvus import DataType, AnnSearchRequest, WeightedRanker
|
||||
from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_SESSION, CONSISTENCY_EVENTUALLY
|
||||
|
||||
default_nb = ct.default_nb
|
||||
default_dim = ct.default_dim
|
||||
default_primary_key_field_name = ct.default_primary_key_field_name
|
||||
default_vector_field_name = ct.default_vector_field_name
|
||||
default_int32_field_name = ct.default_int32_field_name
|
||||
default_search_exp = "id >= 0"
|
||||
|
||||
class TestMilvusClientTTL(TestMilvusClientV2Base):
|
||||
""" Test case of Time To Live """
|
||||
@ -266,4 +272,87 @@ class TestMilvusClientTTL(TestMilvusClientV2Base):
|
||||
self.describe_collection(client, collection_name)
|
||||
|
||||
self.query(client, collection_name, output_fields=["count(*)"])
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("partial_update", [False, True])
|
||||
def test_milvus_client_partial_update_with_ttl(self, partial_update):
|
||||
"""
|
||||
target: test PU will extend the ttl of the collection
|
||||
method:
|
||||
1. Create a collection
|
||||
2. Insert rows
|
||||
3. Continuously query and search the collection
|
||||
4. Upsert the rows with partial update
|
||||
5. query and verify ttl deadline
|
||||
expected: Step 5 should success
|
||||
"""
|
||||
# step 1: create collection
|
||||
ttl_time = 5
|
||||
client = self._client()
|
||||
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
||||
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
|
||||
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
||||
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
|
||||
index_params = self.prepare_index_params(client)[0]
|
||||
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
|
||||
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
|
||||
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
|
||||
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
|
||||
self.create_collection(client, collection_name, default_dim, schema=schema,
|
||||
properties={"collection.ttl.seconds": ttl_time}, consistency_level="Strong", index_params=index_params)
|
||||
|
||||
# step 2: Insert rows
|
||||
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
|
||||
self.insert(client, collection_name, rows)
|
||||
start_time = time.time()
|
||||
self.flush(client, collection_name)
|
||||
self.release_collection(client, collection_name)
|
||||
self.load_collection(client, collection_name)
|
||||
|
||||
# step 3: Continuously query and search the collection
|
||||
upsert_time = ttl_time / 2
|
||||
pu = True
|
||||
update_nb = default_nb // 2
|
||||
end_time = ttl_time * 2.5
|
||||
new_ttl_time = ttl_time
|
||||
while time.time() - start_time < end_time:
|
||||
# query
|
||||
# start_time ------- pu_time ------- ttl_time ------- new_ttl_time ------- end_time
|
||||
# before ttl_time, the count(*) should be default_nb
|
||||
# before new_ttl_time, and after ttl_time the count(*) should be update_nb
|
||||
# after new_ttl_time, the count(*) should be 0
|
||||
res = self.query(client, collection_name, filter=default_search_exp, output_fields=["count(*)"])
|
||||
if time.time() - start_time <= ttl_time:
|
||||
assert res[0][0].get('count(*)') == default_nb
|
||||
elif time.time() - start_time > ttl_time and time.time() - start_time <= new_ttl_time:
|
||||
assert res[0][0].get('count(*)') == update_nb
|
||||
else:
|
||||
assert res[0][0].get('count(*)') == 0
|
||||
|
||||
# search
|
||||
# before new_ttl_time, the search result should be 10
|
||||
# after new_ttl_time, the search result should be 0
|
||||
search_vectors = cf.gen_vectors(1, dim=default_dim)
|
||||
res = self.search(client, collection_name, search_vectors, anns_field=default_vector_field_name, search_params={}, limit=10)
|
||||
if time.time() - start_time <= new_ttl_time:
|
||||
assert len(res[0][0]) == 10
|
||||
else:
|
||||
assert len(res[0][0]) == 0
|
||||
|
||||
time.sleep(1)
|
||||
# upsert
|
||||
if pu and time.time() - start_time >= upsert_time:
|
||||
if partial_update:
|
||||
new_rows = cf.gen_row_data_by_schema(nb=update_nb, schema=schema,
|
||||
desired_field_names=[default_primary_key_field_name, default_vector_field_name])
|
||||
else:
|
||||
new_rows = cf.gen_row_data_by_schema(nb=update_nb, schema=schema)
|
||||
|
||||
self.upsert(client, collection_name, new_rows, partial_update=partial_update)
|
||||
pu_time = time.time() - start_time
|
||||
new_ttl_time = pu_time + ttl_time
|
||||
pu = False
|
||||
time.sleep(1)
|
||||
|
||||
self.drop_collection(client, collection_name)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user