diff --git a/tests/python_client/base/client_base.py b/tests/python_client/base/client_base.py index 0303e41301..bf9699e878 100644 --- a/tests/python_client/base/client_base.py +++ b/tests/python_client/base/client_base.py @@ -25,6 +25,7 @@ class Base: collection_schema_wrap = None field_schema_wrap = None collection_object_list = [] + resource_group_list = [] def setup_class(self): log.info("[setup_class] Start setup class...") @@ -64,6 +65,16 @@ class Base: if collection_object.collection is not None and collection_object.name in collection_list: collection_object.drop(check_task=ct.CheckTasks.check_nothing) + """ Clean up the rgs before disconnect """ + for rg_name in self.resource_group_list: + rg = self.utility_wrap.describe_resource_group(name=rg_name)[0] + if rg is not None: + if rg.num_available_node > 0: + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=rg.num_available_node) + self.utility_wrap.drop_resource_group(rg_name, check_task=ct.CheckTasks.check_nothing) + except Exception as e: log.debug(str(e)) @@ -262,3 +273,14 @@ class TestcaseBase(Base): collection_w.insert(df) assert collection_w.num_entities == nb_of_segment * (i + 1) return collection_w + + def init_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]: + self._connect() + utility_w = ApiUtilityWrapper() + res, check_result = utility_w.create_resource_group(name=name, using=using, timeout=timeout, + check_task=check_task, + check_items=check_items, **kwargs) + if res is None and check_result: + self.resource_group_list.append(name) + return res, check_result diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index e5b38b0494..7548ee3b31 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -449,4 +449,41 @@ class ApiUtilityWrapper: func_name = sys._getframe().f_code.co_name res, check = api_request([self.role.list_grants], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() - return res, check_result \ No newline at end of file + return res, check_result + + def create_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.create_resource_group, name, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def drop_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.drop_resource_group, name, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def list_resource_groups(self, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.list_resource_groups, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def describe_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.describe_resource_group, name, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.transfer_node, source, target, num_node, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.transfer_replica, source, target, collection_name,num_replica, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 89f2326af9..008918559b 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -6,7 +6,7 @@ from common import common_type as ct from common import common_func as cf from common.common_type import CheckTasks, Connect_Object_Name # from common.code_mapping import ErrorCode, ErrorMessage -from pymilvus import Collection, Partition +from pymilvus import Collection, Partition, ResourceGroupInfo from utils.api_request import Error import check.param_check as pc @@ -83,6 +83,9 @@ class ResponseChecker: elif self.check_task == CheckTasks.check_permission_deny: # Collection interface response check result = self.check_permission_deny(self.response, self.succ) + elif self.check_task == CheckTasks.check_rg_property: + # describe resource group interface response check + result = self.check_rg_property(self.response, self.func_name, self.check_items) # Add check_items here if something new need verify @@ -196,6 +199,29 @@ class ResponseChecker: assert partition.num_entities == check_items["num_entities"] return True + @staticmethod + def check_rg_property(rg, func_name, check_items): + exp_func_name = "describe_resource_group" + if func_name != exp_func_name: + log.warning("The function name is {} rather than {}".format(func_name, exp_func_name)) + if not isinstance(rg, ResourceGroupInfo): + raise Exception("The result to check isn't ResourceGroupInfo type object") + if len(check_items) == 0: + raise Exception("No expect values found in the check task") + if check_items.get("name", None): + assert rg.name == check_items["name"] + if check_items.get("capacity", None): + assert rg.capacity == check_items["capacity"] + if check_items.get("num_available_node", None): + assert rg.num_available_node == check_items["num_available_node"] + if check_items.get("num_loaded_replica", None): + assert rg.num_loaded_replica == check_items["num_loaded_replica"] + if check_items.get("num_outgoing_node", None): + assert rg.num_outgoing_node == check_items["num_outgoing_node"] + if check_items.get("num_incoming_node", None): + assert rg.num_incoming_node == check_items["num_incoming_node"] + return True + @staticmethod def check_search_results(search_res, func_name, check_items): """ diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 7e764a8e0b..ca88ee801a 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -323,11 +323,11 @@ def gen_default_binary_dataframe_data(nb=ct.default_nb, dim=ct.default_dim, star return df, binary_raw_values -def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim): - int_values = [i for i in range(nb)] - float_values = [np.float32(i) for i in range(nb)] - string_values = [str(i) for i in range(nb)] - float_vec_values = gen_vectors(nb, dim) +def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim, start=0): + int_values = [i for i in range(start, start + nb)] + float_values = [np.float32(i) for i in range(start, start + nb)] + string_values = [str(i) for i in range(start, start + nb)] + float_vec_values = gen_vectors(start + nb, dim) data = [int_values, float_values, string_values, float_vec_values] return data diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 6edadb4f45..6967e125dc 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -37,6 +37,7 @@ default_float_vec_field_name = "float_vector" another_float_vec_field_name = "float_vector1" default_binary_vec_field_name = "binary_vector" default_partition_name = "_default" +default_resource_group_name = '__default_resource_group' default_tag = "1970_01_01" row_count = "row_count" default_length = 65535 @@ -200,6 +201,7 @@ class CheckTasks: check_role_property = "check_role_property" check_permission_deny = "check_permission_deny" check_value_equal = "check_value_equal" + check_rg_property = "check_resource_group_property" class BulkLoadStates: diff --git a/tests/python_client/testcases/test_resourcegroup.py b/tests/python_client/testcases/test_resourcegroup.py new file mode 100644 index 0000000000..fed7500bc8 --- /dev/null +++ b/tests/python_client/testcases/test_resourcegroup.py @@ -0,0 +1,1300 @@ +import pytest + +from base.client_base import TestcaseBase +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_pymilvus import * +from utils.util_log import test_log as log + + +@pytest.mark.skip(reason="still debugging") +class TestResourceGroupParams(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + def test_rg_default(self): + """ + method: + 1. create a rg abc + 2. describe the default rg and the rg abc + 3. transfer 1 query node from default rg to the rg abc + 4. describe the rg abc and verify both the capacity and available nodes increased 1 + 5. describe the default rg and verify both the capacity and available nodes decreased 1 + 6. try to drop the rg abc, expected fail with err mgs + 7. transfer 1 query node from the rg abc to the default rg + 8. describe both rgs and verify the values changed accordingly + 9. drop the rg abc, expected success + verify: transfer node successfully and rg info updated accordingly + """ + self._connect() + rgs, _ = self.utility_wrap.list_resource_groups() + rgs_count = len(rgs) + default_rg_init_cap = 2 + default_rg_init_available_node = 1 + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": default_rg_init_cap, + "num_available_node": default_rg_init_available_node, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=default_rg_info) + + # create my rg + m_rg_name = cf.gen_unique_str("rg") + self.init_resource_group(name=m_rg_name) + rgs, _ = self.utility_wrap.list_resource_groups() + new_rgs_count = len(rgs) + assert rgs_count + 1 == new_rgs_count + new_rg_init_cap = 0 + new_rg_init_num_available_node = 0 + new_rg_init_info = {"name": m_rg_name, + "capacity": new_rg_init_cap, + "num_available_node": new_rg_init_num_available_node, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {}, + } + self.utility_wrap.describe_resource_group(name=m_rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=new_rg_init_info) + + # transfer 1 query node from default to my rg + num_node = 1 + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=m_rg_name, num_node=num_node) + rgs, _ = self.utility_wrap.list_resource_groups() + assert new_rgs_count == len(rgs) + target_rg_info = {"name": m_rg_name, + "capacity": new_rg_init_cap + num_node, + "num_available_node": new_rg_init_num_available_node + num_node, + } + self.utility_wrap.describe_resource_group(name=m_rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=target_rg_info) + source_rg_info = {"name": ct.default_resource_group_name, + "capacity": default_rg_init_cap - num_node, + "num_available_node": default_rg_init_available_node - num_node, + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=source_rg_info) + + # try to drop my rg + error = {ct.err_code: 999, ct.err_msg: 'failed to drop resource group, err=delete non-empty rg is not permitted'} + self.utility_wrap.drop_resource_group(name=m_rg_name, check_task=ct.CheckTasks.err_res, + check_items=error) + + # transfer back the query node to default rg + self.utility_wrap.transfer_node(source=m_rg_name, target=ct.default_resource_group_name, num_node=num_node) + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=default_rg_info) + self.utility_wrap.describe_resource_group(name=m_rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=new_rg_init_info) + + # try to drop my rg again + self.utility_wrap.drop_resource_group(name=m_rg_name) + rgs, _ = self.utility_wrap.list_resource_groups() + assert len(rgs) == rgs_count + # pytest.skip(reason='issue #21962') + error = {ct.err_code: 999, ct.err_msg: 'failed to describe resource group, err=rg is not existing'} + self.utility_wrap.describe_resource_group(name=m_rg_name, + check_task=ct.CheckTasks.err_res, + check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("rg_name", ["", None]) + def test_create_rg_empty(self, rg_name): + """ + method: create a rg with an empty or null name + verify: fail with error msg + """ + self._connect() + error = {ct.err_code: 999, + ct.err_msg: "failed to create resource group, err=resource group name couldn't be empty"} + self.init_resource_group(name=rg_name, check_task=ct.CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_create_rg_valid_names(self): + """ + method: create a rg with a valid name(what are valid names?) + verify: create a rg successfully + """ + pass + + @pytest.mark.skip(reason="need define rules of valid names") + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("rg_name", ct.get_invalid_strs) + def test_create_rg_invalid_names(self, rg_name): + """ + method: create a rg with an invalid name(what are invalid names? types, length, chinese,symbols) + verify: fail with error msg + """ + self._connect() + self.init_resource_group(name=rg_name) + # TODO: check error msg + + @pytest.mark.tags(CaseLabel.L1) + def test_create_rg_max_length_name(self): + """ + method: create a rg with a max length name + verify: create a rg successfully + """ + pass + + @pytest.mark.tags(CaseLabel.L1) + def test_create_rg_dup_name(self): + """ + method: + 1. create a rg with name abc + 2. create a rg with name abc again + verify: fail with error msg when creating with a dup name + """ + self._connect() + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(name=rg_name) + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items={"name": rg_name}) + error = {ct.err_code: 999, + ct.err_msg: "failed to create resource group, err=resource group already exist"} + self.init_resource_group(name=rg_name, + check_task=ct.CheckTasks.err_res, + check_items=error) + + @pytest.mark.skip(reason="issue #21971") + @pytest.mark.tags(CaseLabel.L1) + def test_create_rg_dropped_name(self): + """ + method: + 1. create a rg with name abc + 2. list rgs and describe the rg abc + 3. drop the rg abc + 4. list rgs and describe the rg abc + 5. create rg with the same name + 6. list rgs and describe the rg abc + verify: create rg successfully for both times + """ + self._connect() + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(name=rg_name) + rgs_count = len(self.utility_wrap.list_resource_groups()[0]) + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items={"name": rg_name}) + # drop the rg + self.utility_wrap.drop_resource_group(name=rg_name) + assert len(self.utility_wrap.list_resource_groups()[0]) == rgs_count - 1 + # error = {ct.err_code: 999, + # ct.err_msg: "failed to create resource group, err=resource group not exist"} + # self.utility_wrap.describe_resource_group(name=rg_name, + # check_task=ct.CheckTasks.err_res, + # check_items=error) + # create the rg with the same name again + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items={"name": rg_name}) + assert rgs_count == len(self.utility_wrap.list_resource_groups()[0]) + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items={"name": rg_name}) + + @pytest.mark.tags(CaseLabel.L1) + def test_create_max_number_rgs(self): + """ + method: + 1. create rgs at a max number (max=1024) + 2. list rgs + 3. create one more rg + 4. list rgs + verify: create successfully at a max number and fail when create one more + """ + pass + + @pytest.mark.tags(CaseLabel.L1) + def test_drop_rg_non_existing(self): + """ + method: drop a rg with a non existing name + verify: drop successfully + """ + self._connect() + rgs_count = len(self.utility_wrap.list_collections()[0]) + rg_name = 'non_existing' + self.utility_wrap.drop_resource_group(name=rg_name) + assert rgs_count == len(self.utility_wrap.list_collections()[0]) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("rg_name", ["", None]) + def test_drop_rg_empty_name(self, rg_name): + """ + method: drop a rg with empty or None name + verify: drop successfully + """ + self._connect() + rgs_count = len(self.utility_wrap.list_collections()[0]) + self.utility_wrap.drop_resource_group(name=rg_name) + assert rgs_count == len(self.utility_wrap.list_collections()[0]) + + @pytest.mark.tags(CaseLabel.L1) + def test_drop_rg_invalid_names(self): + """ + method: drop a rg with invalid names(null, chinese, symbols) + verify: drop fail with error msg + """ + pass + + @pytest.mark.tags(CaseLabel.L1) + def test_drop_rg_twice(self): + """ + method: + 1. create a rg abc + 2. list rgs + 3. drop the rg abc + 4. list rgs and describe the rg + verify: drop fail with error msg at second time + """ + self._connect() + rgs_count = len(self.utility_wrap.list_resource_groups()[0]) + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(name=rg_name) + assert rgs_count + 1 == len(self.utility_wrap.list_resource_groups()[0]) + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=ct.CheckTasks.check_rg_property, + check_items={"name": rg_name}) + self.utility_wrap.drop_resource_group(name=rg_name) + assert rgs_count == len(self.utility_wrap.list_resource_groups()[0]) + self.utility_wrap.drop_resource_group(name=rg_name) + assert rgs_count == len(self.utility_wrap.list_resource_groups()[0]) + + @pytest.mark.tags(CaseLabel.L1) + def test_drop_default_rg(self): + """ + method: + 1. drop default rg when there are available nodes + 2. drop default rg when there are no available nodes + 3. drop the rg abc + 4. list rgs and describe the rg + verify: drop fail with error msg at second time + """ + self._connect() + rgs_count = len(self.utility_wrap.list_resource_groups()[0]) + default_rg_init_cap = 1000000 + default_rg_init_available_node = 1 + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": default_rg_init_cap, + "num_available_node": default_rg_init_available_node, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=default_rg_info) + error = {ct.err_code: 999, ct.err_msg: 'failed to drop resource group, err=delete default rg is not permitted'} + self.utility_wrap.drop_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.err_res, + check_items=error) + assert len(self.utility_wrap.list_resource_groups()[0]) == rgs_count + default_rg, _ = self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + # @pytest.mark.skip(reason="issue #21971") + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("rg_name", ["", None]) + def test_describe_rg_empty_name(self, rg_name): + """ + method: describe a rg with an empty name + verify: fail with error msg + """ + self._connect() + self.utility_wrap.describe_resource_group(name=rg_name) + # TODO: check error + + @pytest.mark.tags(CaseLabel.L1) + def test_describe_rg_invalid_names(self): + """ + method: describe a rg with an invalid name(what are invalid names? types, length, chinese,symbols) + verify: fail with error msg + """ + pass + + # @pytest.mark.skip(reason="issue #21962") + # @pytest.mark.tags(CaseLabel.L1) + def test_describe_rg_non_existing(self): + """ + method: describe an non existing rg + verify: fail with error msg + """ + self._connect() + non_existing_rg = 'non_existing' + error = {ct.err_code: 999, ct.err_msg: 'failed to describe resource group, err=rg is not existing'} + self.utility_wrap.describe_resource_group(name=non_existing_rg, + check_task=ct.CheckTasks.err_res, + check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + def test_describe_default_rg(self): + """ + method: describe the default rg + verify: verify the capacity, available nodes, num_loaded_replica, + num_outgoing_node and num_incoming_node + """ + self._connect() + default_rg_init_cap = 2 + default_rg_init_available_node = 1 + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": default_rg_init_cap, + "num_available_node": default_rg_init_available_node, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=ct.CheckTasks.check_rg_property, + check_items=default_rg_info) + + +@pytest.mark.skip(reason="still debugging") +class TestTransferNode(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + def test_transfer_node_default(self): + """ + Method: + 1. create 2 rgs: RgA and RgB + 2. transfer 1 node from RgA to RgB + verify failure with error + 3. transfer 1 node from default to RgA + verify transfer successfully + 4. transfer 1 node from RgA to RgB + verify transfer successfully + 5. drop rg RgB + verify fail with error + 6. drop rg RgA + verify success rg RgA + """ + self._connect() + rg1_name = cf.gen_unique_str('rg1') + rg2_name = cf.gen_unique_str('rg2') + self.init_resource_group(rg1_name) + self.init_resource_group(rg2_name) + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=rg1_name, num_node=1) + rg_init_cap = 0 + rg_init_available_node = 0 + rg1_info = {"name": rg1_name, + "capacity": rg_init_cap + 1, + "num_available_node": rg_init_available_node + 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg1_name, + check_task=CheckTasks.check_rg_property, + check_items=rg1_info) + + # transfer a querynode to rgB + self.utility_wrap.transfer_node(source=rg1_name, target=rg2_name, num_node=1) + rg2_info = {"name": rg2_name, + "capacity": rg_init_cap + 1, + "num_available_node": rg_init_available_node + 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg2_name, + check_task=CheckTasks.check_rg_property, + check_items=rg2_info) + rg1_info = {"name": rg1_name, + "capacity": rg_init_cap, + "num_available_node": rg_init_available_node, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg1_name, + check_task=CheckTasks.check_rg_property, + check_items=rg1_info) + + # drop rgB + error = {ct.err_code: 999, + ct.err_msg: 'failed to drop resource group, err=delete non-empty rg is not permitted'} + self.utility_wrap.drop_resource_group(name=rg2_name, + check_task=CheckTasks.err_res, + check_items=error + ) + self.utility_wrap.drop_resource_group(name=rg1_name) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("with_growing", [True, False]) + def test_transfer_node_from_default_rg(self, with_growing): + """ + Method: + 1. prepare a collection to search + 2. create a rgA + 3. transfer the node from default rg to rgA + 4. insert to make some growing + verify search keeps succ + 5. transfer the node back to default rg + 6. insert to make some growing + verify search keeps succ + """ + # create a collectionA and insert data + dim = 128 + collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim) + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=100)) + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # verify search succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": 2, + "num_available_node": 1, + "num_loaded_replica": {collection_w.name: 1}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + # create a rgA + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(rg_name) + # transfer query node from default to rgA + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rg_name, + num_node=1) + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=100)) + # verify rg state + rg_info = {"name": rg_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {collection_w.name: 1} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": 1, + "num_available_node": 0, + "num_loaded_replica": {collection_w.name: 1}, + "num_outgoing_node": {collection_w.name: 1}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + # verify search keeps succ after transfer + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + # transfer query node back to default + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=1) + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=100)) + # verify search keeps succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + # verify rg state + rg_info = {"name": rg_name, + "capacity": 0, + "num_available_node": 0, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": 2, + "num_available_node": 1, + "num_loaded_replica": {collection_w.name: 1}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + @pytest.mark.xfail(reason="issue #22051") + @pytest.mark.tags(CaseLabel.L0) + def test_load_collection_with_no_available_node(self): + """ + Method: + 1. create a collectionA and insert data + 2. release the collection if loaded + 3. create a rgA + 4. transfer query node from default to rgA + 5. load the collection with default rg + verify load fail with no available query node in default rg + 6. load the collection with rgA + verify load succ and search succ + verify rgA state + 7. transfer query node from rgA to default rg + verify search keeps succ + verify rgA and default rg state + 8. load the collection with default rg + verify fail for already loaded in other rg + """ + self._connect() + dim = ct.default_dim + # 1. create a collectionA and insert data + collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim) + # 2. release the collection if loaded + collection_w.release() + # 3. create a rgA + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(rg_name) + # 4. transfer query node from default to rgA + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rg_name, + num_node=1) + # 5. load the collection with default rg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(check_task=CheckTasks.err_res, check_items=error) + + # 6. load the collection with rgA + rg_info = {"name": rg_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + collection_w.load(_resource_groups=[rg_name]) + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # verify search succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + # check rgA info + rg_info = {"name": rg_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {collection_w.name: 1}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + + # 7. transfer query node from rgA to default rg + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=1) + # verify search keeps succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + # check rgA info after transfer + rg_info = {"name": rg_name, + "capacity": 0, + "num_available_node": 0, + "num_loaded_replica": {}, + "num_outgoing_node": {collection_w.name: 1}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {collection_w.name: 1} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + # 8. load the collection with default rg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load, err=already loaded in the other rg'} + collection_w.load(check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.xfail(reason="issue #22058") + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("replicas", [1, 2, 3]) + def test_load_collection_with_multi_replicas_multi_rgs(self, replicas): + """ + Method: + 1. create a collection and insert data + 2. release if loaded + 3. create rgA and rgB + 4. load collection 1,2,3 with rgA and rgB + 5. load collection 1,2,3 with default rg and rgA + """ + self._connect() + # 1. create a collectionA and insert data + collection_w, _, _, _, _ = self.init_collection_general(insert_data=True) + # 2. release the collection if loaded + collection_w.release() + # 3. create a rgA and rgB + rgA_name = cf.gen_unique_str('rgA') + self.init_resource_group(rgA_name) + rgB_name = cf.gen_unique_str('rgB') + self.init_resource_group(rgB_name) + + # load with different replicas + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=no enough nodes to create replicas[NoEnoughNode]'} + collection_w.load(replica_number=replicas, + _resource_groups=[rgA_name, rgB_name], + check_task=CheckTasks.err_res, check_items=error) + + # error = {ct.err_code: 999, + # ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(replica_number=replicas, + _resource_groups=[ct.default_resource_group_name, rgB_name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.xfail("reason=issue #22058") + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("rg_names", [[], [""], ["non_existing"], "不合法"]) + def test_load_collection_with_empty_rg_name(self, rg_names): + """ + Method: + 1. create a collection + 2. load with empty rg name + """ + collection_w = self.init_collection_wrap() + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(_resource_groups=rg_names, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.xfail(reason="issue #22051") + @pytest.mark.tags(CaseLabel.L0) + def test_load_partition_with_no_available_node(self): + """ + Method: + 1. create a partition and insert data + 3. create a rgA + 4. transfer query node from default to rgA + 5. load the collection with default rg + verify load fail with no available query node in default rg + 6. load the partition with rgA + verify load succ and search succ + verify rgA state + 7. transfer query node from rgA to default rg + verify search keeps succ + verify rgA and default rg state + 8. load the partition with default rg + verify fail for already loaded in other rg + """ + self._connect() + dim = ct.default_dim + # 1. create a partition and insert data + collection_w = self.init_collection_wrap() + partition_name = cf.gen_unique_str('par') + partition_w = self.init_partition_wrap(collection_w, partition_name) + data = cf.gen_default_list_data(nb=3000) + ins_res, _ = partition_w.insert(data) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + # 3. create a rgA + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(rg_name) + # 4. transfer query node from default to rgA + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rg_name, + num_node=1) + # 5. load the collection with default rg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[nodes not enough]'} + partition_w.load(check_task=CheckTasks.err_res, check_items=error) + + # 6. load the collection with rgA + partition_w.load(_resource_groups=[rg_name]) + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # verify search succ + partition_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": ins_res.primary_keys, + "limit": ct.default_limit} + ) + # check rgA info + rg_info = {"name": rg_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {collection_w.name: 1}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + + # 7. transfer query node from rgA to default rg + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=1) + # verify search keeps succ + partition_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": ins_res.primary_keys, + "limit": ct.default_limit} + ) + # check rgA info after transfer + rg_info = {"name": rg_name, + "capacity": 0, + "num_available_node": 0, + "num_loaded_replica": {}, + "num_outgoing_node": {collection_w.name: 1}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + default_rg_info = {"name": ct.default_resource_group_name, + "capacity": 1, + "num_available_node": 1, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {collection_w.name: 1} + } + self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, + check_task=CheckTasks.check_rg_property, + check_items=default_rg_info) + + # 8. load the collection with default rg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load, err=already loaded in the other rg'} + partition_w.load(check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("replicas", [1, 2, 3]) + def test_load_partition_with_multi_replicas_multi_rgs(self, replicas): + """ + Method: + 1. create a partition and insert data + 3. create rgA and rgB + 4. load collection 1,2,3 with rgA and rgB + 5. load collection 1,2,3 with default rg and rgA + """ + self._connect() + dim = ct.default_dim + # 1. create a partition and insert data + collection_w = self.init_collection_wrap() + partition_name = cf.gen_unique_str('par') + partition_w = self.init_partition_wrap(collection_w, partition_name) + data = cf.gen_default_list_data(nb=3000) + ins_res, _ = partition_w.insert(data) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + # 3. create a rgA and rgB + rgA_name = cf.gen_unique_str('rgA') + self.init_resource_group(rgA_name) + rgB_name = cf.gen_unique_str('rgB') + self.init_resource_group(rgB_name) + + # load with different replicas + error = {ct.err_code: 999, + ct.err_msg: 'failed to load partitions, err=no enough nodes to create replicas[NoEnoughNode]'} + partition_w.load(replica_number=replicas, + _resource_groups=[rgA_name, rgB_name], + check_task=CheckTasks.err_res, check_items=error) + + error = {ct.err_code: 999, + ct.err_msg: 'failed to load partitions, err=no enough nodes to create replicas[NoEnoughNode]'} + partition_w.load(replica_number=replicas, + _resource_groups=[ct.default_resource_group_name, rgB_name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("rg_names", [[], [""], ["non_existing"], "不合法"]) + def test_load_partition_with_empty_rg_name(self, rg_names): + """ + Method: + 1. create a partition + 2. load with empty rg name + """ + self._connect() + dim = ct.default_dim + # 1. create a partition + collection_w = self.init_collection_wrap() + partition_name = cf.gen_unique_str('par') + partition_w = self.init_partition_wrap(collection_w, partition_name) + + error = {ct.err_code: 999, + ct.err_msg: 'failed to load partition, err=failed to spawn replica for collection[nodes not enough]'} + partition_w.load(_resource_groups=rg_names, + check_task=CheckTasks.err_res, check_items=error) + + +@pytest.mark.skip(reason="still debugging") +# run the multi node tests with 8 query nodes +class TestResourceGroupMultiNodes(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("with_growing", [True, False]) + def test_load_with_replicas_and_nodes_num(self, with_growing): + """ + Method: + 1. prepare a collection with segments + 2. create rgA + 3. transfer 4 nodes to rgA + 4. load 5 replicas in rgA + verify load fail with msg + 5. load 3 replica in rgA + verify load succ + 6. release and reload 4 replica in rgA + verify load succ and each replica occupies 1 query node in rgA + 7. verify load successfully again with no parameters + """ + # prepare a collection with segments + dim = 128 + collection_w = self.init_collection_wrap(shards_num=2) + insert_ids = [] + nb = 3000 + for i in range(5): + res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i*nb)) + collection_w.flush() + insert_ids.extend(res.primary_keys) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=500, dim=dim, start=6*nb)) + + # create rgA + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(name=rg_name) + # transfer 4 nodes to rgA + num_nodes_to_rg = 4 + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rg_name, + num_node=num_nodes_to_rg) + rg_info = {"name": rg_name, + "capacity": num_nodes_to_rg, + "num_available_node": num_nodes_to_rg, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + + # load 5 replicas in rgA and verify error msg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(replica_number=5, + _resource_groups=[rg_name], + check_task=ct.CheckTasks.err_res, + check_items=error) + + # load 3 replicas in rgA + replica_number = 3 + collection_w.load(replica_number=replica_number, + _resource_groups=[rg_name]) + # verify rg state after loaded + rg_info = {"name": rg_name, + "capacity": num_nodes_to_rg, + "num_available_node": num_nodes_to_rg, + "num_loaded_replica": {collection_w.name: replica_number}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + # verify replica state + replicas = collection_w.get_replicas() + num_nodes_for_replicas = 0 + assert len(replicas) == replica_number + for rep in replicas: + assert rep.resource_group_name == rg_name + assert rep.num_outbound_node == {} + num_nodes_for_replicas += len(rep.node_ids) + assert num_nodes_for_replicas == num_nodes_to_rg + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=7 * nb)) + + # verify search succ + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + # release + collection_w.release() + # verify rg state after release + rg_info = {"name": rg_name, + "capacity": num_nodes_to_rg, + "num_available_node": num_nodes_to_rg, + "num_loaded_replica": {}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + # reload 4 replica in rgA + replica_number = 4 + collection_w.load(replica_number=replica_number, + _resource_groups=[rg_name]) + # verify rg state after reload + rg_info = {"name": rg_name, + "capacity": num_nodes_to_rg, + "num_available_node": num_nodes_to_rg, + "num_loaded_replica": {collection_w.name: replica_number}, + "num_outgoing_node": {}, + "num_incoming_node": {} + } + self.utility_wrap.describe_resource_group(name=rg_name, + check_task=CheckTasks.check_rg_property, + check_items=rg_info) + + # verify replica state + replicas = collection_w.get_replicas() + assert len(replicas) == replica_number + for rep in replicas: + assert rep.resource_group_name == rg_name + assert rep.num_outbound_node == {} + assert len(rep.node_ids) == 1 # one replica for each node + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb)) + + # verify search succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + # verify load successfully again with no parameters + collection_w.load() + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("with_growing", [True, False]) + def test_load_with_replicas_and_rgs_num(self, with_growing): + """ + Method: + 1. prepare a collection with multi segments + 2. create rgA and rgB + 3. transfer 2 nodes to rgA and 3 nodes to rgB + 4. load 3 replicas in rgA and rgB + verify load fail with msg + 5. load 1 replica in rgA and rgB + verify load fail with msg + 6. load 2 replica in rgA and rgB + verify load succ and each replica occupies one rg + 7. verify load successfully again with no parameters + """ + # prepare a collection with segments + dim = 128 + collection_w = self.init_collection_wrap(shards_num=2) + insert_ids = [] + nb = 3000 + for i in range(5): + res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb)) + collection_w.flush() + insert_ids.extend(res.primary_keys) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=500, dim=dim, start=6 * nb)) + + # create rgA and rgB + rgA_name = cf.gen_unique_str('rgA') + self.init_resource_group(name=rgA_name) + rgB_name = cf.gen_unique_str('rgB') + self.init_resource_group(name=rgB_name) + # transfer 2 nodes to rgA and 3 nodes to rgB + num_nodes_rgA = 2 + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rgA_name, + num_node=num_nodes_rgA) + num_nodes_rgB = 3 + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rgA_name, + num_node=num_nodes_rgB) + # load 3 replicas in rgA and rgB + replica_number = 3 + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(replica_number=replica_number, + _resource_groups=[rgA_name, rgB_name], + check_task=CheckTasks.err_res, + check_items=error) + + # load 1 replica in rgA and rgB + replica_number = 1 + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + collection_w.load(replica_number=replica_number, + _resource_groups=[rgA_name, rgB_name], + check_task=CheckTasks.err_res, + check_items=error) + + # load 2 replica in rgA and rgB + replica_number = 2 + collection_w.load(replica_number=replica_number, + _resource_groups=[rgA_name, rgB_name]) + # verify replica state: each replica occupies one rg + replicas = collection_w.get_replicas() + assert len(replicas) == replica_number + for rep in replicas: + assert rep.num_outbound_node == {} + assert rep.resource_group_name in [rgA_name, rgB_name] + if rep.resource_group_name == rgA_name: + assert len(rep.node_ids) == num_nodes_rgA # one replica for each rg + else: + assert len(rep.node_ids) == num_nodes_rgB # one replica for each rg + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb)) + + # verify search succ + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + # verify load successfully again with no parameters + collection_w.load() + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("with_growing", [True, False]) + def test_transfer_replica_into_same_rg(self, with_growing): + """ + Method: + 1. prepare a collection with multi segments + 2. create rgA and rgB + 3. transfer 1 nodes to rgA and 2 nodes to rgB + 4. load 2 replicas in rgA and rgB + 5. transfer 1 replica from rgB to rgA + verify fail ? + 6. transfer 1 replica from rgA to rgB + verify succ? + """ + self._connect() + rgA_name = cf.gen_unique_str('rgA') + self.init_resource_group(name=rgA_name) + rgB_name = cf.gen_unique_str('rgB') + self.init_resource_group(name=rgB_name) + + # transfer 2 nodes to rgA, 4 nodes to rgB + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rgA_name, + num_node=1) + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rgA_name, + num_node=2) + + dim = 128 + collection_w = self.init_collection_wrap(shards_num=4) + insert_ids = [] + nb = 500 + for i in range(5): + res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb)) + collection_w.flush() + insert_ids.extend(res.primary_keys) + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + + collection_w.load(replica_number=2, _resource_groups=[rgA_name, rgB_name]) + + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=6 * nb)) + + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # verify search succ + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + # transfer 1 replica from rgB to rgA + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + self.utility_wrap.transfer_replica(source=rgB_name, target=rgA_name, + collection_name=collection_w.name, num_replica=1, + check_task=CheckTasks.err_res, + check_items=error) + # transfer 1 replica from rgA to rgB + self.utility_wrap.transfer_replica(source=rgA_name, target=rgB_name, + collection_name=collection_w.name, num_replica=1) + # make growing + if with_growing: + collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=7 * nb)) + + # verify search succ + nq = 5 + vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + collection_w.search(vectors[:nq], + ct.default_float_vec_field_name, + ct.default_search_params, + ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": insert_ids, + "limit": ct.default_limit} + ) + + @pytest.mark.tags(CaseLabel.L0) + def test_transfer_replica_not_enough_replicas_to_transfer(self): + """ + Method: + 1. prepare a collection with multi segments + 2. create rgA + 3. transfer 3 nodes to rgA + 4. transfer 2 replicas to rgA + verify fail with error + """ + self._connect() + dim = ct.default_dim + # create a collectionA and insert data + collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim) + + # create a rgA + rg_name = cf.gen_unique_str('rg') + self.init_resource_group(rg_name) + # transfer 3 nodes to rgA + self.utility_wrap.transfer_node(source=ct.default_resource_group_name, + target=rg_name, + num_node=3) + + # transfer 2 replicas to rgA + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + self.utility_wrap.transfer_replica(source=ct.default_resource_group_name, target=rg_name, + collection_name=collection_w.name, num_replica=2, + check_task=CheckTasks.err_res, + check_items=error) + + @pytest.mark.tags(CaseLabel.L0) + def test_transfer_replica_non_existing_rg(self): + """ + Method: + 1. prepare a collection ready for searching + 2. transfer the replica from default rg to non_existing rg + """ + self._connect() + dim = ct.default_dim + # create a collectionA and insert data + collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim) + + # transfer replica to a non_existing rg + rg_name = "non_existing" + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + self.utility_wrap.transfer_replica(source=ct.default_resource_group_name, + target=rg_name, + num_replica=1, + check_task=CheckTasks.err_res, + check_items=error) + + # transfer replica from a non_existing rg + error = {ct.err_code: 999, + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + self.utility_wrap.transfer_replica(source=rg_name, + target=ct.default_resource_group_name, + num_replica=1, + check_task=CheckTasks.err_res, + check_items=error)