test: add collection V2 cases for milvus client (#44008)

issue: #43590
Migrate collection test cases from TestcaseBase to
TestMilvusClientV2Base

---------

Signed-off-by: Orpheus Wang <orpheus.wang@zilliz.com>
This commit is contained in:
9Eurydice9 2025-08-22 13:13:47 +08:00 committed by GitHub
parent d5ecf49319
commit 59d333819c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 325 deletions

View File

@ -2346,6 +2346,7 @@ class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
# Step 2: Test point 1 - loaded partitions can be searched/queried
self.release_collection(client, collection_name)
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
@ -2458,8 +2459,10 @@ class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
self.load_collection(client, collection_name)
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2])
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
@ -2485,6 +2488,10 @@ class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
self.release_partitions(client, collection_name, [partition_name_2])
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
self.load_partitions(client, collection_name, [partition_name_2])
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2])
@ -2677,10 +2684,101 @@ class TestMilvusClientLoadPartition(TestMilvusClientV2Base):
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
# 7. load the whole collection (should succeed)
# 7. load partition2 twice
self.load_partitions(client, collection_name, [partition_name_2])
self.load_partitions(client, collection_name, [partition_name_2])
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_2])
# 8. load the whole collection (should succeed)
self.load_collection(client, collection_name)
self.query(client, collection_name, filter=default_search_exp)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_release_load_collection_after_load_partition_drop_another(self):
"""
target: test release/load collection after loading one partition and dropping another
method: 1) load partitions 2) drop one partition 3) release another partition 4) load collection 5) query
expected: no exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
# Create collection and partitions
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
# Load one partition, drop the other, then release the loaded partition
self.load_partitions(client, collection_name, [partition_name_1])
self.release_partitions(client, collection_name, [partition_name_2])
self.drop_partition(client, collection_name, partition_name_2)
self.release_partitions(client, collection_name, [partition_name_1])
# Load the whole collection and run a query
self.load_collection(client, collection_name)
self.query(client, collection_name, filter=default_search_exp)
# Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_after_partition_operations(self):
"""
target: comprehensive test for load collection after various partition operations
method: combines three V1 test scenarios:
1. load partition -> release partition -> load collection -> search
2. load partition -> release partitions -> query (should fail) -> load collection -> query
3. load partition -> drop partition -> query (should fail) -> drop another -> load collection -> query
expected: all operations should work correctly with proper error handling
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
# Create collection and partitions
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
self.release_collection(client, collection_name)
# Scenario 1: load partition -> release partition -> load collection -> search
self.load_partitions(client, collection_name, [partition_name_1])
self.release_partitions(client, collection_name, [partition_name_1])
self.load_collection(client, collection_name)
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
self.search(client, collection_name, vectors_to_search, limit=default_limit,
partition_names=[partition_name_1, partition_name_2])
# Scenario 2: load partition -> release partitions -> query (should fail) -> load collection -> query
self.release_collection(client, collection_name)
self.load_partitions(client, collection_name, [partition_name_1])
self.release_partitions(client, collection_name, [partition_name_1])
self.release_partitions(client, collection_name, [partition_name_2])
error = {ct.err_code: 65535, ct.err_msg: 'collection not loaded'}
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_1, partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
self.load_collection(client, collection_name)
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_1, partition_name_2])
# Scenario 3: load partition -> drop partition -> query (should fail) -> drop another -> load collection -> query
self.release_collection(client, collection_name)
self.load_partitions(client, collection_name, [partition_name_1])
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
error = {ct.err_code: 65535, ct.err_msg: f'partition name {partition_name_1} not found'}
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_1, partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
self.drop_partition(client, collection_name, partition_name_2)
self.load_collection(client, collection_name)
self.query(client, collection_name, filter=default_search_exp)
# Cleanup
self.drop_collection(client, collection_name)
class TestMilvusClientDescribeCollectionInvalid(TestMilvusClientV2Base):
""" Test case of search interface """

View File

@ -688,326 +688,3 @@ class TestLoadCollection(TestcaseBase):
collection_w.query(expr=f'{ct.default_int64_field_name} >= 0', output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={'exp_res': [{"count(*)": ct.default_nb}]})
class TestLoadPartition(TestcaseBase):
"""
******************************************************************
The following cases are used to test `load_collection` function
******************************************************************
"""
@pytest.fixture(
scope="function",
params=gen_binary_index()
)
def get_binary_index(self, request):
log.info(request.param)
if request.param["index_type"] in ct.binary_supported_index_types:
return request.param
else:
pytest.skip("Skip index Temporary")
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_load_unloaded_partition(self):
"""
target: test load partition after load an unloaded partition
method: 1. load partition
2. load another partition
3. query on the collection
4. load collection
expected: No exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w2.load()
collection_w.query(default_term_expr)
collection_w.load()
@pytest.mark.tags(CaseLabel.L0)
def test_load_partitions_release_collection(self):
"""
target: test release collection after load partitions
method: 1. load partition
2. release collection
3. query on the partition
4. load partitions
5. query on the collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
collection_w.release()
error = {ct.err_code: 65535, ct.err_msg: "collection not loaded"}
collection_w.query(default_term_expr, partition_names=[partition1],
check_task=CheckTasks.err_res, check_items=error)
partition_w1.load()
partition_w2.load()
collection_w.query(default_term_expr)
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_release_collection(self):
"""
target: test load collection after load partitions
method: 1. load partition
2. release collection
3. load collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
collection_w.release()
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_load_release_partition(self):
"""
target: test load partitions after load and release partition
method: 1. load partition
2. release partition
3. query on the partition
4. load partitions(include released partition and non-released partition)
5. query on the collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
error = {ct.err_code: 65535,
ct.err_msg: 'collection not loaded'}
collection_w.query(default_term_expr, partition_names=[partition1],
check_task=CheckTasks.err_res, check_items=error)
partition_w1.load()
partition_w2.load()
collection_w.query(default_term_expr)
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_load_release_partition(self):
"""
target: test load collection after load and release partition
method: 1. load partition
2. release partition
3. load collection
4. search on the collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
collection_w.load()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1, partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_load_partition_release_partitions(self):
"""
target: test load partitions after load partition and release partitions
method: 1. load partition
2. release partitions
3. load partitions
4. query on the partitions
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w2.release()
partition_w1.load()
partition_w2.load()
collection_w.query(default_term_expr, partition_names=[partition1, partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_load_partition_release_partitions(self):
"""
target: test load collection after load partition and release partitions
method: 1. load partition
2. release partitions
3. query on the partitions
4. load collection
5. query on the partitions
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w2.release()
error = {ct.err_code: 65535, ct.err_msg: 'collection not loaded'}
collection_w.query(default_term_expr, partition_names=[partition1, partition2],
check_task=CheckTasks.err_res, check_items=error)
collection_w.load()
collection_w.query(default_term_expr, partition_names=[partition1, partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_partition_after_load_drop_partition(self):
"""
target: test load partition after load and drop partition
method: 1. load partition
2. drop the loaded partition
3. load the left partition
4. query on the partition
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w1.drop()
partition_w2.load()
collection_w.query(default_term_expr, partition_names=[partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_load_drop_partition(self):
"""
target: test load collection after load and drop partition
method: 1. load partition
2. drop the loaded partition
3. query on the partition
4. drop another partition
5. load collection
6. query on the collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w1.drop()
error = {ct.err_code: 65535, ct.err_msg: f'partition name {partition1} not found'}
collection_w.query(default_term_expr, partition_names=[partition1, partition2],
check_task=CheckTasks.err_res, check_items=error)
partition_w2.drop()
collection_w.load()
collection_w.query(default_term_expr)
@pytest.mark.tags(CaseLabel.L2)
def test_release_load_partition_after_load_drop_partition(self):
"""
target: test release load partition after load and drop partition
method: 1. load partition
2. drop the loaded partition
3. release another partition
4. load the partition
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w1.drop()
partition_w2.release()
partition_w2.load()
collection_w.query(default_term_expr, partition_names=[partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_release_load_collection_after_load_drop_partition(self):
"""
target: test release load partition after load and drop partition
method: 1. load partition
2. drop the loaded partition
3. release another partition
4. load collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w1.release()
partition_w1.drop()
partition_w2.release()
collection_w.load()
collection_w.query(default_term_expr)
@pytest.mark.tags(CaseLabel.L2)
def test_load_another_partition_after_load_drop_partition(self):
"""
target: test load another collection after load and drop one partition
method: 1. load partition
2. drop the unloaded partition
3. load the partition again
4. query on the partition
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w2.drop()
partition_w1.load()
collection_w.query(default_term_expr, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_release_load_partition_after_load_partition_drop_another(self):
"""
target: test release load partition after load and drop partition
method: 1. load partition
2. drop the unloaded partition
3. release the loaded partition
4. query on the released partition
5. reload the partition
6. query on the partition
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w2.drop()
partition_w1.release()
error = {ct.err_code: 65535,
ct.err_msg: 'collection not loaded'}
collection_w.query(default_term_expr, partition_names=[partition1],
check_task=CheckTasks.err_res, check_items=error)
partition_w1.load()
collection_w.query(default_term_expr, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_release_load_collection_after_load_partition_drop_another(self):
"""
target: test release load partition after load and drop partition
method: 1. load partition
2. drop the unloaded partition
3. release the loaded partition
4. load collection
5. query on the collection
expected: no exception
"""
collection_w = self.init_collection_general(prefix, is_index=False)[0]
collection_w.create_index(default_search_field)
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
partition_w1.load()
partition_w2.drop()
partition_w1.release()
collection_w.load()
collection_w.query(default_term_expr)