From fba57d95ae877b3663e23e84e917d3eff9752d37 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Thu, 20 Apr 2023 16:37:02 +0800 Subject: [PATCH] Add flush all test cases (#23129) (#23516) Signed-off-by: ThreadDao --- .../python_client/base/collection_wrapper.py | 4 + tests/python_client/base/utility_wrapper.py | 48 +++-- tests/python_client/common/common_type.py | 1 + tests/python_client/testcases/test_utility.py | 187 ++++++++++++++++-- 4 files changed, 214 insertions(+), 26 deletions(-) diff --git a/tests/python_client/base/collection_wrapper.py b/tests/python_client/base/collection_wrapper.py index cc16fad06f..e1c04b9079 100644 --- a/tests/python_client/base/collection_wrapper.py +++ b/tests/python_client/base/collection_wrapper.py @@ -63,6 +63,10 @@ class ApiCollectionWrapper: self.flush() return self.collection.num_entities + @property + def num_entities_without_flush(self): + return self.collection.num_entities + @property def primary_field(self): return self.collection.primary_field diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index 62894eb5a2..dccdb11713 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -9,6 +9,7 @@ from utils.api_request import api_request from pymilvus import BulkInsertState from pymilvus import Role from utils.util_log import test_log as log + TIMEOUT = 20 @@ -33,14 +34,16 @@ class ApiUtilityWrapper: log.info(f"after bulk load, there are {len(working_tasks)} working tasks") return res, check_result - def get_bulk_insert_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None, **kwargs): + def get_bulk_insert_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None, + **kwargs): func_name = sys._getframe().f_code.co_name res, is_succ = api_request([self.ut.get_bulk_insert_state, task_id, timeout, using], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, task_id=task_id, using=using).run() return res, check_result - def list_bulk_insert_tasks(self, limit=0, collection_name=None, timeout=None, using="default", check_task=None, check_items=None, **kwargs): + def list_bulk_insert_tasks(self, limit=0, collection_name=None, timeout=None, using="default", check_task=None, + check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name res, is_succ = api_request([self.ut.list_bulk_insert_tasks, limit, collection_name, timeout, using], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, @@ -88,7 +91,7 @@ class ApiUtilityWrapper: unknown = unknown + 1 log.info("There are", len(tasks), "bulkload tasks.", pending, "pending,", started, "started,", persisted, - "persisted,", completed, "completed,", failed, "failed", failed_and_cleaned, "failed_and_cleaned", + "persisted,", completed, "completed,", failed, "failed", failed_and_cleaned, "failed_and_cleaned", unknown, "unknown") def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted, @@ -109,7 +112,8 @@ class ApiUtilityWrapper: log.info(f"wait bulk load timeout is {task_timeout}") pending_tasks = self.get_bulk_insert_pending_list() log.info(f"before waiting, there are {len(pending_tasks)} pending tasks") - while len(tasks_state_distribution["success"])+len(tasks_state_distribution["failed"]) < len(task_ids) and end-start <= task_timeout: + while len(tasks_state_distribution["success"]) + len(tasks_state_distribution["failed"]) < len( + task_ids) and end - start <= task_timeout: time.sleep(2) for task_id in task_ids: @@ -134,21 +138,22 @@ class ApiUtilityWrapper: if task_id in tasks_state_distribution["in_progress"]: tasks_state_distribution["in_progress"].remove(task_id) tasks_state_distribution["success"].add(task_id) - elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted, BulkInsertState.ImportPersisted]: + elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted, + BulkInsertState.ImportPersisted]: tasks_state_distribution["in_progress"].add(task_id) else: tasks_state_distribution["failed"].add(task_id) - + end = time.time() pending_tasks = self.get_bulk_insert_pending_list() log.info(f"after waiting, there are {len(pending_tasks)} pending tasks") log.info(f"task state distribution: {tasks_state_distribution}") log.info(tasks_state) if len(tasks_state_distribution["success"]) == len(task_ids): - log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}") + log.info(f"wait for bulk load tasks completed successfully, cost time: {end - start}") return True, tasks_state else: - log.info(f"wait for bulk load tasks completed failed, cost time: {end-start}") + log.info(f"wait for bulk load tasks completed failed, cost time: {end - start}") return False, tasks_state def wait_all_pending_tasks_finished(self): @@ -162,7 +167,8 @@ class ApiUtilityWrapper: log.info(f"current tasks states: {task_states_map}") pending_tasks = self.get_bulk_insert_pending_list() working_tasks = self.get_bulk_insert_working_list() - log.info(f"in the start, there are {len(working_tasks)} working tasks, {working_tasks} {len(pending_tasks)} pending tasks, {pending_tasks}") + log.info( + f"in the start, there are {len(working_tasks)} working tasks, {working_tasks} {len(pending_tasks)} pending tasks, {pending_tasks}") time_cnt = 0 pending_task_ids = set() while len(pending_tasks) > 0: @@ -174,7 +180,8 @@ class ApiUtilityWrapper: for task_id in pending_tasks.keys(): cur_pending_task_ids.append(task_id) pending_task_ids.add(task_id) - log.info(f"after {time_cnt}, there are {len(working_tasks)} working tasks, {len(pending_tasks)} pending tasks") + log.info( + f"after {time_cnt}, there are {len(working_tasks)} working tasks, {len(pending_tasks)} pending tasks") log.debug(f"total pending tasks: {pending_task_ids} current pending tasks: {cur_pending_task_ids}") log.info(f"after {time_cnt}, all pending tasks are finished") all_tasks, _ = self.list_bulk_insert_tasks() @@ -331,7 +338,7 @@ class ApiUtilityWrapper: def create_user(self, user, password, using="default", check_task=None, check_items=None): func_name = sys._getframe().f_code.co_name res, is_succ = api_request([self.ut.create_user, user, password, using]) - check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,using=using).run() + check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, using=using).run() return res, check_result def list_usernames(self, using="default", check_task=None, check_items=None): @@ -475,24 +482,35 @@ class ApiUtilityWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None, + **kwargs): func_name = sys._getframe().f_code.co_name res, check = api_request([self.ut.transfer_node, source, target, num_node, using, timeout], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None, + check_task=None, check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name - res, check = api_request([self.ut.transfer_replica, source, target, collection_name,num_replica, using, timeout], **kwargs) + res, check = api_request( + [self.ut.transfer_replica, source, target, collection_name, num_replica, using, timeout], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result def rename_collection(self, old_collection_name, new_collection_name, timeout=None, check_task=None, check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name - res, check = api_request([self.ut.rename_collection, old_collection_name, new_collection_name, timeout], **kwargs) + res, check = api_request([self.ut.rename_collection, old_collection_name, new_collection_name, timeout], + **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, old_collection_name=old_collection_name, new_collection_name=new_collection_name, timeout=timeout, **kwargs).run() return res, check_result + def flush_all(self, using="default", timeout=None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.ut.flush_all, using, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, + using=using, timeout=timeout, **kwargs).run() + return res, check_result + diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index d8e4687a33..f0dbad793e 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -63,6 +63,7 @@ max_field_num = 64 # Maximum number of fields in a collection max_name_length = 255 # Maximum length of name for a collection or alias default_replica_num = 1 default_graceful_time = 5 # +max_shards_num = 64 IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus" NAMESPACE_CHAOS_TESTING = "chaos-testing" diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index ef36e37453..692b61ce42 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -5,6 +5,7 @@ import pytest from pymilvus import DefaultConfig from pymilvus.exceptions import MilvusException from base.client_base import TestcaseBase +from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper from utils.util_log import test_log as log from common import common_func as cf @@ -534,7 +535,8 @@ class TestUtilityParams(TestcaseBase): self.utility_wrap.rename_collection(old_collection_name, new_collection_name, check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "`collection_name` value {} is illegal".format(old_collection_name)}) + "err_msg": "`collection_name` value {} is illegal".format( + old_collection_name)}) @pytest.mark.tags(CaseLabel.L1) def test_rename_collection_old_invalid_value(self, get_invalid_value_collection_name): @@ -601,7 +603,8 @@ class TestUtilityParams(TestcaseBase): self.utility_wrap.rename_collection(old_collection_name, new_collection_name, check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "can't find collection: {}".format(collection_w.name)}) + "err_msg": "can't find collection: {}".format( + collection_w.name)}) @pytest.mark.tags(CaseLabel.L1) def test_rename_collection_existed_collection_name(self): @@ -617,7 +620,8 @@ class TestUtilityParams(TestcaseBase): check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": "duplicated new collection name :{} with other " - "collection name or alias".format(collection_w.name)}) + "collection name or alias".format( + collection_w.name)}) @pytest.mark.tags(CaseLabel.L1) def test_rename_collection_existed_collection_alias(self): @@ -656,6 +660,7 @@ class TestUtilityParams(TestcaseBase): "err_msg": "unsupported use an alias to " "rename collection, alias:{}".format(alias)}) + class TestUtilityBase(TestcaseBase): """ Test case of index interface """ @@ -1642,6 +1647,7 @@ class TestUtilityBase(TestcaseBase): assert collection_alias[0] in collections assert old_collection_name not in collections + class TestUtilityAdvanced(TestcaseBase): """ Test case of index interface """ @@ -1848,7 +1854,7 @@ class TestUtilityAdvanced(TestcaseBase): if x in segment_distribution else 0, reverse=True) # add node id greater than all querynodes, which is not exist for querynode, to src_node_ids max_query_node_id = max(all_querynodes) - invalid_src_node_id = max_query_node_id+1 + invalid_src_node_id = max_query_node_id + 1 src_node_id = all_querynodes[0] dst_node_ids = all_querynodes[1:] sealed_segment_ids = segment_distribution[src_node_id]["sealed"] @@ -2364,8 +2370,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase): # 3.reset password with the wrong username self.utility_wrap.update_password(user="hobo", old_password=old_password, new_password="qwaszx1", - check_task=ct.CheckTasks.err_res, - check_items={ct.err_code: 30}) + check_task=ct.CheckTasks.err_res, + check_items={ct.err_code: 30}) @pytest.mark.tags(ct.CaseLabel.L3) @pytest.mark.parametrize("user", ["demo"]) @@ -2386,8 +2392,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase): # 3.reset password with the wrong new password self.utility_wrap.update_password(user=user, old_password=old_password, new_password=new_password, - check_task=ct.CheckTasks.err_res, - check_items={ct.err_code: 5}) + check_task=ct.CheckTasks.err_res, + check_items={ct.err_code: 5}) @pytest.mark.tags(ct.CaseLabel.L3) @pytest.mark.parametrize("user", ["genny"]) @@ -2401,8 +2407,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase): password=ct.default_password, check_task=ct.CheckTasks.ccr) self.utility_wrap.create_user(user=user, password="qwaszx0") self.utility_wrap.update_password(user=user, old_password="waszx0", new_password="123456", - check_task=ct.CheckTasks.err_res, - check_items={ct.err_code: 30}) + check_task=ct.CheckTasks.err_res, + check_items={ct.err_code: 30}) @pytest.mark.tags(ct.CaseLabel.L3) def test_delete_user_root(self, host, port): @@ -2764,7 +2770,7 @@ class TestUtilityRBAC(TestcaseBase): self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password, check_task=ct.CheckTasks.ccr) self.utility_wrap.init_role(r_name) - self.utility_wrap.role_revoke("Global", "*", "CreateCollection") + self.utility_wrap.role_revoke("Global", "*", "CreateCollection") # verify revoke is success self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING) @@ -4051,3 +4057,162 @@ class TestUtilityNegativeRbac(TestcaseBase): self.utility_wrap.create_role() error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name} self.utility_wrap.role_revoke("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error) + + +@pytest.mark.tags(CaseLabel.L3) +class TestUtilityFlushAll(TestcaseBase): + + def test_flush_all_multi_collections(self): + """ + target: test flush multi collections + method: 1.create multi collections + 2. insert data into each collections without flushing + 3. delete some data + 4. flush all collections + 5. verify num entities + 6. create index -> load -> query deleted ids -> query no-deleted ids + expected: the insert and delete data of all collections are flushed + """ + collection_num = 5 + collection_names = [] + delete_num = 100 + for i in range(collection_num): + collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False, + is_index=True) + collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}') + collection_names.append(collection_w.name) + + self.utility_wrap.flush_all(timeout=60) + cw = ApiCollectionWrapper() + for c in collection_names: + cw.init_collection(name=c) + assert cw.num_entities_without_flush == ct.default_nb + + cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + cw.load() + cw.query(f'{ct.default_int64_field_name} in {insert_ids[:100]}', check_task=CheckTasks.check_query_empty) + + res, _ = cw.query(f'{ct.default_int64_field_name} not in {insert_ids[:delete_num]}') + assert len(res) == ct.default_nb - delete_num + + def test_flush_all_multi_shards_timeout(self): + """ + target: test flush_all collection with max shards_num + method: 1.create collection with max shards_num + 2.insert data + 3.flush all with a small timeout and gets exception + 4.flush and verify num entities + expected: + """ + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=ct.max_shards_num) + df = cf.gen_default_dataframe_data() + collection_w.insert(df) + error = {ct.err_code: 1, ct.err_msg: "wait for flush all timeout"} + self.utility_wrap.flush_all(timeout=5, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.flush_all(timeout=120) + assert collection_w.num_entities_without_flush == ct.default_nb + + def test_flush_all_no_collections(self): + """ + target: test flush all without any collections + method: connect and flush all + expected: no exception + """ + self._connect() + self.utility_wrap.flush_all(check_task=None) + + def test_flush_all_async(self): + """ + target: test flush all collections with _async + method: flush all collections and _async=True + expected: finish flush all collection within a period of time + """ + collection_num = 5 + collection_names = [] + delete_num = 100 + + for i in range(collection_num): + collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False, + is_index=True) + collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}') + collection_names.append(collection_w.name) + + self.utility_wrap.flush_all(_async=True) + _async_timeout = 60 + cw = ApiCollectionWrapper() + start = time.time() + while time.time() - start < _async_timeout: + time.sleep(2.0) + flush_flag = False + for c in collection_names: + cw.init_collection(name=c) + if cw.num_entities_without_flush == ct.default_nb: + flush_flag = True + else: + log.debug(f"collection num entities: {cw.num_entities_without_flush} of collection {c}") + flush_flag = False + if flush_flag: + break + if time.time() - start > _async_timeout: + raise MilvusException(1, f"Waiting more than {_async_timeout}s for the flush all") + + def test_flush_all_while_insert_delete(self): + """ + target: test flush all while insert and delete + method: 1. prepare multi collections + 2. flush_all while inserting and deleting + expected: + """ + from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED + collection_num = 5 + collection_names = [] + delete_num = 100 + delete_ids = [_i for _i in range(delete_num)] + + for i in range(collection_num): + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=4) + df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=0) + collection_w.insert(df) + collection_names.append(collection_w.name) + + def do_insert(): + cw = ApiCollectionWrapper() + df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=ct.default_nb) + for c_name in collection_names: + cw.init_collection(c_name) + insert_res, _ = cw.insert(df) + assert insert_res.insert_count == ct.default_nb + + def do_delete(): + cw = ApiCollectionWrapper() + for c_name in collection_names: + cw.init_collection(c_name) + del_res, _ = cw.delete(f"{ct.default_int64_field_name} in {delete_ids}") + assert del_res.delete_count == delete_num + + def do_flush_all(): + time.sleep(2) + self.utility_wrap.flush_all(timeout=600) + + executor = ThreadPoolExecutor(max_workers=3) + insert_task = executor.submit(do_insert) + delete_task = executor.submit(do_delete) + flush_task = executor.submit(do_flush_all) + + # wait all tasks completed + wait([insert_task, delete_task, flush_task], return_when=ALL_COMPLETED) + + # verify + for c in collection_names: + cw = ApiCollectionWrapper() + cw.init_collection(name=c) + log.debug(f"num entities: {cw.num_entities_without_flush}") + assert cw.num_entities_without_flush >= ct.default_nb + assert cw.num_entities_without_flush <= ct.default_nb * 2 + + cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index) + cw.load() + cw.query(f'{ct.default_int64_field_name} in {delete_ids}', check_task=CheckTasks.check_query_empty) + + res, _ = cw.query(f'{ct.default_int64_field_name} not in {delete_ids}') + assert len(res) == ct.default_nb * 2 - delete_num