diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 6d336b65dd..471f5d2c3a 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -40,6 +40,7 @@ def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True): @functools.wraps(func) def inner_wrapper(self, *args, **kwargs): start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + start_time_ts = time.time() t0 = time.perf_counter() res, result = func(self, *args, **kwargs) elapsed = time.perf_counter() - t0 @@ -56,12 +57,13 @@ def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True): self.average_time = ( elapsed + self.average_time * self._succ) / (self._succ + 1) self._succ += 1 + # add first success record if there is no success record before if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \ self._succ + self._fail == self.fail_records[-1][1] + 1: - self.fail_records.append(("success", self._succ + self._fail, start_time)) + self.fail_records.append(("success", self._succ + self._fail, start_time, start_time_ts)) else: self._fail += 1 - self.fail_records.append(("failure", self._succ + self._fail, start_time)) + self.fail_records.append(("failure", self._succ + self._fail, start_time, start_time_ts)) return res, result return inner_wrapper return decorate @@ -93,6 +95,7 @@ class Checker: """ def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim): + self.recovery_time = 0 self._succ = 0 self._fail = 0 self.fail_records = [] @@ -139,12 +142,28 @@ class Checker: self._keep_running = False self.reset() + def pause(self): + self._keep_running = False + time.sleep(10) + def reset(self): self._succ = 0 self._fail = 0 self.rsp_times = [] self.average_time = 0 + def get_rto(self): + if len(self.fail_records) == 0: + return 0 + end = self.fail_records[-1][3] + start = self.fail_records[0][3] + recovery_time = end - start # second + self.recovery_time = recovery_time + checker_name = self.__class__.__name__ + log.info(f"{checker_name} recovery time is {self.recovery_time}, start at {self.fail_records[0][2]}, " + f"end at {self.fail_records[-1][2]}") + return recovery_time + class SearchChecker(Checker): """check search operations in a dependent thread""" @@ -269,13 +288,28 @@ class InsertChecker(Checker): super().__init__(collection_name=collection_name, shards_num=shards_num) self._flush = flush self.initial_entities = self.c_wrap.num_entities + self.inserted_data = [] + self.scale = 1*10**6 + self.start_time_stamp = int(time.time()*self.scale) # us + self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp}' @trace() def insert(self): - res, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + data = cf.gen_default_list_data(nb=constants.DELTA_PER_INS) + ts_data = [] + for i in range(constants.DELTA_PER_INS): + time.sleep(0.001) + offset_ts = int(time.time()*self.scale) + ts_data.append(offset_ts) + + data[0] = ts_data # set timestamp (ms) as int64 + log.debug(f"insert data: {ts_data}") + res, result = self.c_wrap.insert(data=data, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) + if result: + self.inserted_data.extend(ts_data) return res, result @exception_handler() @@ -288,6 +322,35 @@ class InsertChecker(Checker): self.run_task() sleep(constants.WAIT_PER_OP / 10) + def verify_data_completeness(self): + try: + self.c_wrap.create_index(ct.default_float_vec_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str('index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + except Exception as e: + log.error(f"create index error: {e}") + self.c_wrap.load() + end_time_stamp = int(time.time()*self.scale) + self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp} and ' \ + f'{ct.default_int64_field_name} <= {end_time_stamp}' + data_in_client = [] + for d in self.inserted_data: + if self.start_time_stamp <= d <= end_time_stamp: + data_in_client.append(d) + res, result = self.c_wrap.query(self.term_expr, timeout=timeout, + output_fields=[f'{ct.default_int64_field_name}'], + limit=len(data_in_client) * 2, + check_task=CheckTasks.check_nothing) + + data_in_server = [] + for r in res: + d = r[f"{ct.default_int64_field_name}"] + data_in_server.append(d) + assert set(data_in_server) == set(data_in_client) + class CreateChecker(Checker): """check create operations in a dependent thread""" diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py new file mode 100644 index 0000000000..5884e2ab1b --- /dev/null +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py @@ -0,0 +1,102 @@ +import pytest +from time import sleep +from pymilvus import connections +from chaos.checker import (CreateChecker, + InsertChecker, + FlushChecker, + SearchChecker, + QueryChecker, + IndexChecker, + DeleteChecker, + DropChecker, + Op) +from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from chaos.chaos_commons import assert_statistic +from chaos import constants +from delayed_assert import assert_expectations + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperations(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port, user, password): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + log.info("connect to milvus successfully") + self.host = host + self.port = port + self.user = user + self.password = password + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.create: CreateChecker(collection_name=c_name), + Op.insert: InsertChecker(collection_name=c_name), + Op.flush: FlushChecker(collection_name=c_name), + Op.index: IndexChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + Op.drop: DropChecker(collection_name=c_name) + } + self.health_checkers = checkers + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, request_duration, is_check): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + c_name = None + self.init_health_checkers(collection_name=c_name) + cc.start_monitor_threads(self.health_checkers) + log.info("*********************Load Start**********************") + # wait request_duration + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") + if request_duration[-1] == "+": + request_duration = request_duration[:-1] + request_duration = eval(request_duration) + for i in range(10): + sleep(request_duration // 10) + for k, v in self.health_checkers.items(): + v.check_result() + for k, v in self.health_checkers.items(): + v.pause() + for k, v in self.health_checkers.items(): + v.check_result() + log.info(f"{k} rto: {v.get_rto()}") + if is_check: + assert_statistic(self.health_checkers, succ_rate_threshold=0.98) + assert_expectations() + # get each checker's rto + for k, v in self.health_checkers.items(): + log.info(f"{k} rto: {v.get_rto()}") + rto = v.get_rto() + assert rto < 30, f"expect 30s but get {rto}s" # rto should be less than 30s + + if Op.insert in self.health_checkers: + # verify the no insert data loss + log.info("*********************Verify Data Completeness**********************") + self.health_checkers[Op.insert].verify_data_completeness() + + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/utils/api_request.py b/tests/python_client/utils/api_request.py index 53da206ae6..c313c50166 100644 --- a/tests/python_client/utils/api_request.py +++ b/tests/python_client/utils/api_request.py @@ -12,6 +12,12 @@ class Error: self.code = getattr(error, 'code', -1) self.message = getattr(error, 'message', str(error)) + def __str__(self): + return f"Error(code={self.code}, message={self.message})" + + def __repr__(self): + return f"Error(code={self.code}, message={self.message})" + log_row_length = 300