test: add collection V2 cases for milvus client (#44021)

issue: #43590
Migrate collection test cases from TestcaseBase to
TestMilvusClientV2Base

Signed-off-by: Orpheus Wang <orpheus.wang@zilliz.com>
This commit is contained in:
9Eurydice9 2025-08-23 21:35:47 +08:00 committed by GitHub
parent cbb9392564
commit d6b78193cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 190 additions and 140 deletions

View File

@ -2587,6 +2587,196 @@ class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
self.release_collection(client, collection_name)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_milvus_client_load_replica_change(self):
"""
target: test load replica change
2.load with a new replica number
3.release collection
4.load with a new replica
5.verify replica changes and query functionality
expected: The second time successfully loaded with a new replica number
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection and insert data
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
schema_info = self.describe_collection(client, collection_name)[0]
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema_info)
self.insert(client, collection_name, rows)
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# Create index and load with replica_number=1
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="FLAT", metric_type="L2")
self.create_index(client, collection_name, index_params)
self.load_collection(client, collection_name, replica_number=1)
# Verify initial load state
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Query to verify functionality
self.query(client, collection_name, filter=f"{default_primary_key_field_name} in [0]",
check_task=CheckTasks.check_query_results,
check_items={"exp_res": [rows[0]], "with_vec": True})
# Load with replica_number=2 (should work)
self.load_collection(client, collection_name, replica_number=2)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Release and reload with replica_number=2
self.release_collection(client, collection_name)
self.load_collection(client, collection_name, replica_number=2)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Verify query still works after replica change
self.query(client, collection_name, filter=f"{default_primary_key_field_name} in [0]",
check_task=CheckTasks.check_query_results,
check_items={"exp_res": [rows[0]], "with_vec": True})
# Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_milvus_client_load_replica_multi(self):
"""
target: test load with multiple replicas
method: 1.create collection with one shard
2.insert multiple segments
3.load with multiple replicas
4.query and search
expected: Query and search successfully
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection with one shard
self.create_collection(client, collection_name, default_dim, shards_num=1)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
schema_info = self.describe_collection(client, collection_name)[0]
# Insert multiple segments
replica_number = 2
total_entities = 0
all_rows = []
for i in range(replica_number):
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema_info, start=i * default_nb)
self.insert(client, collection_name, rows)
total_entities += default_nb
all_rows.extend(rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == total_entities
# Create index and load with multiple replicas
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="FLAT", metric_type="L2")
self.create_index(client, collection_name, index_params)
self.load_collection(client, collection_name, replica_number=replica_number)
# Verify load state
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Query test
query_res, _ = self.query(client, collection_name, filter=f"{default_primary_key_field_name} in [0, {default_nb}]",
check_task=CheckTasks.check_query_results,
check_items={"exp_res": [all_rows[0], all_rows[default_nb]], "with_vec": True})
assert len(query_res) == 2
# Search test
vectors_to_search = cf.gen_vectors(default_nq, default_dim)
self.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"limit": default_limit,
"pk_name": default_primary_key_field_name})
# Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_milvus_client_load_replica_partitions(self):
"""
target: test load replica with partitions
method: 1.Create collection and one partition
2.Insert data into collection and partition
3.Load multi replicas with partition
4.Query
expected: Verify query result
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name = cf.gen_unique_str("partition")
# Create collection
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# Insert data into collection and partition
schema_info = self.describe_collection(client, collection_name)[0]
rows_1 = cf.gen_row_data_by_schema(nb=default_nb, schema=schema_info)
rows_2 = cf.gen_row_data_by_schema(nb=default_nb, schema=schema_info, start=default_nb)
self.insert(client, collection_name, rows_1)
self.create_partition(client, collection_name, partition_name)
self.insert(client, collection_name, rows_2, partition_name=partition_name)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb * 2
# Create index and load partition with multiple replicas
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="FLAT", metric_type="L2")
self.create_index(client, collection_name, index_params)
self.load_partitions(client, collection_name, [partition_name], replica_number=2)
# Verify load state
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Query on loaded partition (should succeed)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} in [{default_nb}]",
partition_names=[partition_name],
check_task=CheckTasks.check_query_results,
check_items={"exp_res": [rows_2[0]], "with_vec": True})
# Query on non-loaded partition (should fail)
error = {ct.err_code: 65538, ct.err_msg: "partition not loaded"}
self.query(client, collection_name, filter=f"{default_primary_key_field_name} in [0]",
partition_names=[ct.default_partition_name, partition_name],
check_task=CheckTasks.err_res, check_items=error)
# Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L3)
def test_milvus_client_count_multi_replicas(self):
"""
target: test count multi replicas
method: 1. load data with multi replicas
2. count
expected: verify count
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection and insert data
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
schema_info = self.describe_collection(client, collection_name)[0]
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema_info)
self.insert(client, collection_name, rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# Create index and load with multiple replicas
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="FLAT", metric_type="L2")
self.create_index(client, collection_name, index_params)
self.load_collection(client, collection_name, replica_number=2)
# Verify load state
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded
# Count with multi replicas
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
output_fields=["count(*)"],
check_task=CheckTasks.check_query_results,
check_items={"exp_res": [{"count(*)": default_nb}]})
# Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)

View File

@ -400,123 +400,6 @@ class TestLoadCollection(TestcaseBase):
******************************************************************
"""
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_load_replica_change(self):
"""
target: test load replica change
method: 1.load with replica 1
2.load with a new replica number
3.release collection
4.load with a new replica
5.create index is a must because get_query_segment_info could
only return indexed and loaded segment
expected: The second time successfully loaded with a new replica number
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
collection_w.load(replica_number=1)
for seg in self.utility_wrap.get_query_segment_info(collection_w.name)[0]:
assert len(seg.nodeIds) == 1
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
loading_progress, _ = self.utility_wrap.loading_progress(collection_w.name)
assert loading_progress == {'loading_progress': '100%'}
# verify load different replicas thrown an exception
collection_w.load(replica_number=2)
one_replica, _ = collection_w.get_replicas()
assert len(one_replica.groups) == 2
collection_w.release()
collection_w.load(replica_number=2)
# replicas is not yet reflected in loading progress
loading_progress, _ = self.utility_wrap.loading_progress(collection_w.name)
assert loading_progress == {'loading_progress': '100%'}
two_replicas, _ = collection_w.get_replicas()
assert len(two_replicas.groups) == 2
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]", check_task=CheckTasks.check_query_results,
check_items={'exp_res': [{'int64': 0}]})
# verify loaded segments included 2 replicas and twice num entities
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
num_entities = 0
for seg in seg_info:
assert len(seg.nodeIds) == 2
num_entities += seg.num_rows
assert num_entities == ct.default_nb
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_load_replica_multi(self):
"""
target: test load with multiple replicas
method: 1.create collection with one shard
2.insert multiple segments
3.load with multiple replicas
4.query and search
expected: Query and search successfully
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
tmp_nb = 1000
replica_number = 2
for i in range(replica_number):
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=i * tmp_nb)
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == (i + 1) * tmp_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=replica_number)
replicas = collection_w.get_replicas()[0]
assert len(replicas.groups) == replica_number
for seg in self.utility_wrap.get_query_segment_info(collection_w.name)[0]:
assert len(seg.nodeIds) == replica_number
query_res, _ = collection_w.query(expr=f"{ct.default_int64_field_name} in [0, {tmp_nb}]")
assert len(query_res) == 2
search_res, _ = collection_w.search(vectors, default_search_field, default_search_params, default_limit)
assert len(search_res[0]) == ct.default_limit
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_load_replica_partitions(self):
"""
target: test load replica with partitions
method: 1.Create collection and one partition
2.Insert data into collection and partition
3.Load multi replicas with partition
4.Query
expected: Verify query result
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df_1 = cf.gen_default_dataframe_data(nb=ct.default_nb)
df_2 = cf.gen_default_dataframe_data(nb=ct.default_nb, start=ct.default_nb)
collection_w.insert(df_1)
partition_w = self.init_partition_wrap(collection_w, ct.default_tag)
partition_w.insert(df_2)
assert collection_w.num_entities == ct.default_nb * 2
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load([partition_w.name], replica_number=2)
for seg in self.utility_wrap.get_query_segment_info(collection_w.name)[0]:
assert len(seg.nodeIds) == 2
# default tag query 0 empty
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]", partition_names=[ct.default_tag],
check_tasks=CheckTasks.check_query_empty)
# default query 0 empty
collection_w.query(expr=f"{ct.default_int64_field_name} in [2000]",
check_task=CheckTasks.check_query_results,
check_items={'exp_res': df_2.iloc[:1, :1].to_dict('records')})
error = {ct.err_code: 65538, ct.err_msg: "partition not loaded"}
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]",
partition_names=[ct.default_partition_name, ct.default_tag],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L3)
def test_load_replica_non_shard_leader(self):
"""
@ -665,26 +548,3 @@ class TestLoadCollection(TestcaseBase):
res, _ = collection_w.get_replicas()
assert len(res.groups) == 0
@pytest.mark.tags(CaseLabel.L3)
def test_count_multi_replicas(self):
"""
target: test count multi replicas
method: 1. load data with multi replicas
2. count
expected: verify count
"""
# create -> insert -> flush
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
collection_w.flush()
# index -> load replicas
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=2)
# count
collection_w.query(expr=f'{ct.default_int64_field_name} >= 0', output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={'exp_res': [{"count(*)": ct.default_nb}]})