mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
[test] Update load multi replicas cases (#16727)
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
parent
9e09434a4b
commit
38280698ff
@ -2225,8 +2225,7 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
error = {ct.err_code: 1, ct.err_msg: f"no enough nodes to create replicas"}
|
error = {ct.err_code: 1, ct.err_msg: f"no enough nodes to create replicas"}
|
||||||
collection_w.load(replica_number=3, check_task=CheckTasks.err_res, check_items=error)
|
collection_w.load(replica_number=3, check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
|
||||||
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/16562")
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
|
||||||
def test_load_replica_change(self):
|
def test_load_replica_change(self):
|
||||||
"""
|
"""
|
||||||
target: test load replica change
|
target: test load replica change
|
||||||
@ -2246,16 +2245,17 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
|
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
|
||||||
|
|
||||||
# verify load different replicas thrown an exception
|
# verify load different replicas thrown an exception
|
||||||
collection_w.load(replica_number=2)
|
error = {ct.err_code: 5, ct.err_msg: f"Should release first then reload with the new number of replicas"}
|
||||||
|
collection_w.load(replica_number=2, check_task=CheckTasks.err_res, check_items=error)
|
||||||
one_replica, _ = collection_w.get_replicas()
|
one_replica, _ = collection_w.get_replicas()
|
||||||
assert len(one_replica.groups) == 1
|
assert len(one_replica.groups) == 1
|
||||||
|
|
||||||
collection_w.release()
|
collection_w.release()
|
||||||
collection_w.load(replica_number=2)
|
collection_w.load(replica_number=2)
|
||||||
two_replicas, _ = collection_w.get_replicas()
|
two_replicas, _ = collection_w.get_replicas()
|
||||||
log.debug(two_replicas)
|
|
||||||
assert len(two_replicas.groups) == 2
|
assert len(two_replicas.groups) == 2
|
||||||
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
|
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]", check_task=CheckTasks.check_query_results,
|
||||||
|
check_items={'exp_res': [{'int64': 0}]})
|
||||||
|
|
||||||
# verify loaded segments included 2 replicas and twice num entities
|
# verify loaded segments included 2 replicas and twice num entities
|
||||||
seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
|
seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
|
||||||
@ -2264,7 +2264,7 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
||||||
assert reduce(lambda x, y: x + y, num_entities) == ct.default_nb * 2
|
assert reduce(lambda x, y: x + y, num_entities) == ct.default_nb * 2
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
def test_load_replica_multi(self):
|
def test_load_replica_multi(self):
|
||||||
"""
|
"""
|
||||||
target: test load with multiple replicas
|
target: test load with multiple replicas
|
||||||
@ -2277,7 +2277,7 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
# create, insert
|
# create, insert
|
||||||
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
|
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
|
||||||
tmp_nb = 1000
|
tmp_nb = 1000
|
||||||
replica_number = 5
|
replica_number = 2
|
||||||
for i in range(replica_number):
|
for i in range(replica_number):
|
||||||
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=i * tmp_nb)
|
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=i * tmp_nb)
|
||||||
insert_res, _ = collection_w.insert(df)
|
insert_res, _ = collection_w.insert(df)
|
||||||
@ -2292,7 +2292,7 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
search_res, _ = collection_w.search(vectors, default_search_field, default_search_params, default_limit)
|
search_res, _ = collection_w.search(vectors, default_search_field, default_search_params, default_limit)
|
||||||
assert len(search_res[0]) == ct.default_limit
|
assert len(search_res[0]) == ct.default_limit
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
def test_load_replica_partitions(self):
|
def test_load_replica_partitions(self):
|
||||||
"""
|
"""
|
||||||
target: test load replica with partitions
|
target: test load replica with partitions
|
||||||
@ -2363,7 +2363,6 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
assert len(shard_leaders) == 2
|
assert len(shard_leaders) == 2
|
||||||
|
|
||||||
# Verify 2 replicas segments loaded
|
# Verify 2 replicas segments loaded
|
||||||
# https://github.com/milvus-io/milvus/issues/16598
|
|
||||||
seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
|
seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
|
||||||
seg_ids = list(map(lambda seg: seg.segmentID, seg_info))
|
seg_ids = list(map(lambda seg: seg.segmentID, seg_info))
|
||||||
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
||||||
@ -2373,10 +2372,9 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
assert len(res[0]) == ct.default_limit
|
assert len(res[0]) == ct.default_limit
|
||||||
|
|
||||||
# verify query sealed and growing data successfully
|
# verify query sealed and growing data successfully
|
||||||
exp_res = [{'int64': 0}, {'int64': 3000}]
|
|
||||||
collection_w.query(expr=f"{ct.default_int64_field_name} in [0, {ct.default_nb}]",
|
collection_w.query(expr=f"{ct.default_int64_field_name} in [0, {ct.default_nb}]",
|
||||||
check_task=CheckTasks.check_query_results,
|
check_task=CheckTasks.check_query_results,
|
||||||
check_items={'exp_res': exp_res})
|
check_items={'exp_res': [{'int64': 0}, {'int64': 3000}]})
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
def test_load_replica_multiple_shard_leader(self):
|
def test_load_replica_multiple_shard_leader(self):
|
||||||
@ -2429,6 +2427,23 @@ class TestLoadCollection(TestcaseBase):
|
|||||||
check_task=CheckTasks.check_query_results,
|
check_task=CheckTasks.check_query_results,
|
||||||
check_items={'exp_res': [{'int64': 0}, {'int64': 3000}]})
|
check_items={'exp_res': [{'int64': 0}, {'int64': 3000}]})
|
||||||
|
|
||||||
|
# https://github.com/milvus-io/milvus/issues/16726
|
||||||
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
|
def test_get_collection_replicas_not_loaded(self):
|
||||||
|
"""
|
||||||
|
target: test get replicas of not loaded collection
|
||||||
|
method: not loaded collection and get replicas
|
||||||
|
expected: raise an exception
|
||||||
|
"""
|
||||||
|
# create, insert
|
||||||
|
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
|
||||||
|
df = cf.gen_default_dataframe_data()
|
||||||
|
insert_res, _ = collection_w.insert(df)
|
||||||
|
assert collection_w.num_entities == ct.default_nb
|
||||||
|
|
||||||
|
collection_w.get_replicas(check_task=CheckTasks.err_res,
|
||||||
|
check_items={"err_code": 15, "err_msg": "getCollectionInfoByID: can't find collectionID"})
|
||||||
|
|
||||||
|
|
||||||
class TestReleaseAdvanced(TestcaseBase):
|
class TestReleaseAdvanced(TestcaseBase):
|
||||||
@pytest.mark.tags(CaseLabel.L0)
|
@pytest.mark.tags(CaseLabel.L0)
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
from functools import reduce
|
||||||
from os import name
|
from os import name
|
||||||
import threading
|
import threading
|
||||||
import pytest
|
import pytest
|
||||||
@ -353,8 +354,7 @@ class TestPartitionParams(TestcaseBase):
|
|||||||
error = {ct.err_code: 1, ct.err_msg: f"no enough nodes to create replicas"}
|
error = {ct.err_code: 1, ct.err_msg: f"no enough nodes to create replicas"}
|
||||||
partition_w.load(replica_number=3, check_task=CheckTasks.err_res, check_items=error)
|
partition_w.load(replica_number=3, check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
|
||||||
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/16641")
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
|
||||||
def test_load_replica_change(self):
|
def test_load_replica_change(self):
|
||||||
"""
|
"""
|
||||||
target: test load replica change
|
target: test load replica change
|
||||||
@ -372,17 +372,27 @@ class TestPartitionParams(TestcaseBase):
|
|||||||
assert partition_w.num_entities == ct.default_nb
|
assert partition_w.num_entities == ct.default_nb
|
||||||
|
|
||||||
partition_w.load(replica_number=1)
|
partition_w.load(replica_number=1)
|
||||||
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
|
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]", check_task=CheckTasks.check_query_results,
|
||||||
partition_w.load(replica_number=2)
|
check_items={'exp_res': [{'int64': 0}]})
|
||||||
|
error = {ct.err_code: 5, ct.err_msg: f"Should release first then reload with the new number of replicas"}
|
||||||
|
partition_w.load(replica_number=2, check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
|
||||||
partition_w.release()
|
partition_w.release()
|
||||||
partition_w.load(replica_number=2)
|
partition_w.load(replica_number=2)
|
||||||
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]")
|
collection_w.query(expr=f"{ct.default_int64_field_name} in [0]", check_task=CheckTasks.check_query_results,
|
||||||
# replica_info = partition_w.get_replicas()
|
check_items={'exp_res': [{'int64': 0}]})
|
||||||
|
|
||||||
# verify replicas
|
two_replicas, _ = collection_w.get_replicas()
|
||||||
|
assert len(two_replicas.groups) == 2
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
# verify loaded segments included 2 replicas and twice num entities
|
||||||
|
seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
|
||||||
|
seg_ids = list(map(lambda seg: seg.segmentID, seg_info))
|
||||||
|
num_entities = list(map(lambda seg: seg.num_rows, seg_info))
|
||||||
|
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
||||||
|
assert reduce(lambda x, y: x + y, num_entities) == ct.default_nb * 2
|
||||||
|
|
||||||
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
def test_partition_replicas_change_cross_partitions(self):
|
def test_partition_replicas_change_cross_partitions(self):
|
||||||
"""
|
"""
|
||||||
target: test load with different replicas between partitions
|
target: test load with different replicas between partitions
|
||||||
@ -390,7 +400,6 @@ class TestPartitionParams(TestcaseBase):
|
|||||||
2.Load two partitions with different replicas
|
2.Load two partitions with different replicas
|
||||||
expected: Raise an exception
|
expected: Raise an exception
|
||||||
"""
|
"""
|
||||||
from functools import reduce
|
|
||||||
# Create two partitions and insert data
|
# Create two partitions and insert data
|
||||||
collection_w = self.init_collection_wrap()
|
collection_w = self.init_collection_wrap()
|
||||||
partition_w1 = self.init_partition_wrap(collection_w)
|
partition_w1 = self.init_partition_wrap(collection_w)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user