diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 572c369a55..75a0364f0b 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -1,9 +1,9 @@ from enum import Enum from random import randint import time -import random +from datetime import datetime +import functools from time import sleep -from delayed_assert import expect from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper from common import common_func as cf @@ -12,26 +12,73 @@ from chaos import constants from common.common_type import CheckTasks from utils.util_log import test_log as log +from utils.api_request import Error class Op(Enum): - create = "create" - insert = "insert" - flush = "flush" - index = "index" - search = "search" - query = "query" - delete = "delete" - compact = "compact" - drop = "drop" - load_balance = "load_balance" - bulk_load = "bulk_load" - - unknown = "unknown" + create = 'create' + insert = 'insert' + flush = 'flush' + index = 'index' + search = 'search' + query = 'query' + delete = 'delete' + compact = 'compact' + drop = 'drop' + load_balance = 'load_balance' + bulk_load = 'bulk_load' + unknown = 'unknown' -timeout = 20 +timeout = 40 enable_traceback = False +DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}' + + +def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True): + def decorate(func): + @functools.wraps(func) + def inner_wrapper(self, *args, **kwargs): + start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + t0 = time.perf_counter() + res, result = func(self, *args, **kwargs) + elapsed = time.perf_counter() - t0 + end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + if flag: + collection_name = self.c_wrap.name + operation_name = func.__name__ + log_str = f"[{prefix}]" + fmt.format(**locals()) + # TODO: add report function in this place, like uploading to influxdb + # it is better a async way to do this, in case of blocking the request processing + log.debug(log_str) + if result: + self.rsp_times.append(elapsed) + self.average_time = ( + elapsed + self.average_time * self._succ) / (self._succ + 1) + self._succ += 1 + else: + self._fail += 1 + return res, result + return inner_wrapper + return decorate + + +def exception_handler(): + def wrapper(func): + @functools.wraps(func) + def inner_wrapper(self, *args, **kwargs): + try: + res, result = func(self, *args, **kwargs) + return res, result + except Exception as e: + log_row_length = 300 + e_str = str(e) + log_e = e_str[0:log_row_length] + \ + '......' if len(e_str) > log_row_length else e_str + log.error(log_e) + return Error(e), False + return inner_wrapper + return wrapper class Checker: @@ -44,24 +91,21 @@ class Checker: def __init__(self, collection_name=None, shards_num=2): self._succ = 0 self._fail = 0 + self._keep_running = True self.rsp_times = [] self.average_time = 0 self.c_wrap = ApiCollectionWrapper() - c_name = collection_name if collection_name is not None else cf.gen_unique_str("Checker_") - self.c_wrap.init_collection( - name=c_name, - schema=cf.gen_default_collection_schema(), - shards_num=shards_num, - timeout=timeout, - enable_traceback=enable_traceback, - ) - self.c_wrap.insert( - data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), - timeout=timeout, - enable_traceback=enable_traceback, - ) + c_name = collection_name if collection_name is not None else cf.gen_unique_str( + 'Checker_') + self.c_wrap.init_collection(name=c_name, + schema=cf.gen_default_collection_schema(), + shards_num=shards_num, + timeout=timeout, + enable_traceback=enable_traceback) + self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), + timeout=timeout, + enable_traceback=enable_traceback) self.initial_entities = self.c_wrap.num_entities # do as a flush - self.c_wrap.release() def total(self): return self._succ + self._fail @@ -73,12 +117,18 @@ class Checker: succ_rate = self.succ_rate() total = self.total() rsp_times = self.rsp_times - average_time = 0 if len(rsp_times) == 0 else sum(rsp_times) / len(rsp_times) + average_time = 0 if len(rsp_times) == 0 else sum( + rsp_times) / len(rsp_times) max_time = 0 if len(rsp_times) == 0 else max(rsp_times) min_time = 0 if len(rsp_times) == 0 else min(rsp_times) - checkers_result = f"succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}" + checker_name = self.__class__.__name__ + checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}" + log.info(checkers_result) return checkers_result + def terminate(self): + self._keep_running = False + def reset(self): self._succ = 0 self._fail = 0 @@ -93,33 +143,29 @@ class SearchChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("SearchChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) - self.c_wrap.load(replica_number=replica_number) # do load before search + # do load before search + self.c_wrap.load(replica_number=replica_number) + + @trace() + def search(self): + res, result = self.c_wrap.search( + data=cf.gen_vectors(5, ct.default_dim), + anns_field=ct.default_float_vec_field_name, + param={"nprobe": 32}, + limit=1, + timeout=timeout, + check_task=CheckTasks.check_nothing + ) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.search() + return res, result def keep_running(self): - while True: - search_vec = cf.gen_vectors(5, ct.default_dim) - t0 = time.time() - _, result = self.c_wrap.search( - data=search_vec, - anns_field=ct.default_float_vec_field_name, - param={"nprobe": 32}, - limit=1, - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing, - ) - t1 = time.time() - if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 + while self._keep_running: + self.run_task() sleep(constants.WAIT_PER_OP / 10) @@ -127,11 +173,6 @@ class InsertFlushChecker(Checker): """check Insert and flush operations in a dependent thread""" def __init__(self, collection_name=None, flush=False, shards_num=2): - if collection_name is None: - if flush: - collection_name = cf.gen_unique_str("FlushChecker_") - else: - collection_name = cf.gen_unique_str("InsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) self._flush = flush self.initial_entities = self.c_wrap.num_entities @@ -139,23 +180,18 @@ class InsertFlushChecker(Checker): def keep_running(self): while True: t0 = time.time() - _, insert_result = self.c_wrap.insert( - data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing, - ) + _, insert_result = \ + self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) t1 = time.time() if not self._flush: if insert_result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) self._succ += 1 - log.debug( - f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) + log.debug(f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -166,18 +202,78 @@ class InsertFlushChecker(Checker): t1 = time.time() if num_entities == (self.initial_entities + constants.DELTA_PER_INS): self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) self._succ += 1 - log.debug( - f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) + log.debug(f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") self.initial_entities += constants.DELTA_PER_INS else: self._fail += 1 +class FlushChecker(Checker): + """check flush operations in a dependent thread""" + + def __init__(self, collection_name=None, flush=False, shards_num=2): + if collection_name is None: + collection_name = cf.gen_unique_str("InsertChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num) + self._flush = flush + self.initial_entities = self.c_wrap.num_entities + + @trace() + def flush(self): + num_entities = self.c_wrap.num_entities + if num_entities == (self.initial_entities + constants.DELTA_PER_INS): + result = True + self.initial_entities += constants.DELTA_PER_INS + else: + result = False + return num_entities, result + + @exception_handler() + def run_task(self): + _, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + res, result = self.flush() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) + + +class InsertChecker(Checker): + """check flush operations in a dependent thread""" + + def __init__(self, collection_name=None, flush=False, shards_num=2): + if collection_name is None: + collection_name = cf.gen_unique_str("InsertChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num) + self._flush = flush + self.initial_entities = self.c_wrap.num_entities + + @trace() + def insert(self): + res, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.insert() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) + + class CreateChecker(Checker): """check create operations in a dependent thread""" @@ -186,30 +282,26 @@ class CreateChecker(Checker): collection_name = cf.gen_unique_str("CreateChecker_") super().__init__(collection_name=collection_name) - def keep_running(self): - while True: - t0 = time.time() - _, result = self.c_wrap.init_collection( - name=cf.gen_unique_str("CreateChecker_"), - schema=cf.gen_default_collection_schema(), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing, - ) - t1 = time.time() - if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}" - ) - self.c_wrap.drop(timeout=timeout) + @trace() + def init_collection(self): + res, result = self.c_wrap.init_collection( + name=cf.gen_unique_str("CreateChecker_"), + schema=cf.gen_default_collection_schema(), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result - else: - self._fail += 1 + @exception_handler() + def run_task(self): + res, result = self.init_collection() + if result: + self.c_wrap.drop(timeout=timeout) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() sleep(constants.WAIT_PER_OP / 10) @@ -220,39 +312,33 @@ class IndexChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("IndexChecker_") super().__init__(collection_name=collection_name) - self.c_wrap.insert( - data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH), - timeout=timeout, - enable_traceback=enable_traceback, - ) - log.debug( - f"Index ready entities: {self.c_wrap.num_entities}" - ) # do as a flush before indexing + self.c_wrap.insert(data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH), + timeout=timeout, enable_traceback=enable_traceback) + # do as a flush before indexing + log.debug(f"Index ready entities: {self.c_wrap.num_entities}") + + @trace() + def create_index(self): + res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + constants.DEFAULT_INDEX_PARAM, + name=cf.gen_unique_str( + 'index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.create_index() + if result: + self.c_wrap.drop_index(timeout=timeout) + return res, result def keep_running(self): - while True: - t0 = time.time() - _, result = self.c_wrap.create_index( - ct.default_float_vec_field_name, - constants.DEFAULT_INDEX_PARAM, - name=cf.gen_unique_str("index_"), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing, - ) - t1 = time.time() - if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - self.c_wrap.drop_index(timeout=timeout) - else: - self._fail += 1 + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) class QueryChecker(Checker): @@ -263,32 +349,53 @@ class QueryChecker(Checker): collection_name = cf.gen_unique_str("QueryChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) self.c_wrap.load(replica_number=replica_number) # do load before query + self.term_expr = None + + @trace() + def query(self): + res, result = self.c_wrap.query(self.term_expr, timeout=timeout, + check_task=CheckTasks.check_nothing) + return res, result + + @exception_handler() + def run_task(self): + int_values = [] + for _ in range(5): + int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) + self.term_expr = f'{ct.default_int64_field_name} in {int_values}' + res, result= self.query() + return res, result def keep_running(self): - while True: - int_values = [] - for _ in range(5): - int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) - term_expr = f"{ct.default_int64_field_name} in {int_values}" - t0 = time.time() - _, result = self.c_wrap.query( - term_expr, - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing, - ) - t1 = time.time() - if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) + + +class LoadChecker(Checker): + """check load operations in a dependent thread""" + + def __init__(self, collection_name=None, replica_number=1): + if collection_name is None: + collection_name = cf.gen_unique_str("DeleteChecker_") + super().__init__(collection_name=collection_name) + self.replica_number = replica_number + + @trace() + def load(self): + res, result = self.c_wrap.load(replica_number=self.replica_number, timeout=timeout) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.load() + if result: + self.c_wrap.release() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() sleep(constants.WAIT_PER_OP / 10) @@ -300,30 +407,27 @@ class DeleteChecker(Checker): collection_name = cf.gen_unique_str("DeleteChecker_") super().__init__(collection_name=collection_name) self.c_wrap.load() # load before query + term_expr = f'{ct.default_int64_field_name} > 0' + res, _ = self.c_wrap.query(term_expr, output_fields=[ + ct.default_int64_field_name]) + self.ids = [r[ct.default_int64_field_name] for r in res] + self.expr = None + + @trace() + def delete(self): + res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout) + return res, result + + @exception_handler() + def run_task(self): + delete_ids = self.ids.pop() + self.expr = f'{ct.default_int64_field_name} in {[delete_ids]}' + res, result = self.delete() + return res, result def keep_running(self): - while True: - term_expr = f"{ct.default_int64_field_name} > 0" - res, _ = self.c_wrap.query( - term_expr, output_fields=[ct.default_int64_field_name] - ) - ids = [r[ct.default_int64_field_name] for r in res] - delete_ids = random.sample(ids, 2) - expr = f"{ct.default_int64_field_name} in {delete_ids}" - t0 = time.time() - _, result = self.c_wrap.delete(expr=expr, timeout=timeout) - tt = time.time() - t0 - if result: - self.rsp_times.append(tt) - self.average_time = (tt + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 + while self._keep_running: + self.run_task() sleep(constants.WAIT_PER_OP / 10) @@ -335,28 +439,23 @@ class CompactChecker(Checker): collection_name = cf.gen_unique_str("CompactChecker_") super().__init__(collection_name=collection_name) self.ut = ApiUtilityWrapper() - self.c_wrap.load(enable_traceback=enable_traceback) # load before compact + self.c_wrap.load() # load before compact + + @trace() + def compact(self): + res, result = self.c_wrap.compact(timeout=timeout) + self.c_wrap.wait_for_compaction_completed() + self.c_wrap.get_compaction_plans() + return res, result + + @exception_handler() + def run_task(self): + res, result = self.compact() + return res, result def keep_running(self): - while True: - seg_info = self.ut.get_query_segment_info(self.c_wrap.name) - t0 = time.time() - res, result = self.c_wrap.compact(timeout=timeout) - print(f"compact done: res {res}") - self.c_wrap.wait_for_compaction_completed() - self.c_wrap.get_compaction_plans() - t1 = time.time() - if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 + while self._keep_running: + self.run_task() sleep(constants.WAIT_PER_OP / 10) @@ -367,24 +466,25 @@ class DropChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("DropChecker_") super().__init__(collection_name=collection_name) - # self.c_wrap.load(enable_traceback=enable_traceback) # load before compact + + @trace() + def drop(self): + res, result = self.c_wrap.drop() + return res, result + + def run_task(self): + res, result = self.drop() + return res, result def keep_running(self): - while True: - t0 = time.time() - _, result = self.c_wrap.drop() - t1 = time.time() + while self._keep_running: + res, result = self.run_task() if result: - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 + self.c_wrap.init_collection( + name=cf.gen_unique_str("CreateChecker_"), + schema=cf.gen_default_collection_schema(), + timeout=timeout, + check_task=CheckTasks.check_nothing) sleep(constants.WAIT_PER_OP / 10) @@ -396,77 +496,58 @@ class LoadBalanceChecker(Checker): collection_name = cf.gen_unique_str("LoadBalanceChecker_") super().__init__(collection_name=collection_name) self.utility_wrap = ApiUtilityWrapper() - self.c_wrap.load(enable_traceback=enable_traceback) + self.c_wrap.load() + self.sealed_segment_ids = None + self.dst_node_ids = None + self.src_node_id = None + + @trace() + def load_balance(self): + res, result = self.utility_wrap.load_balance( + self.c_wrap.name, self.src_node_id, self.dst_node_ids, self.sealed_segment_ids) + return res, result + + def prepare(self): + """prepare load balance params""" + res, _ = self.c_wrap.get_replicas() + # find a group which has multi nodes + group_nodes = [] + for g in res.groups: + if len(g.group_nodes) >= 2: + group_nodes = list(g.group_nodes) + break + self.src_node_id = group_nodes[0] + self.dst_node_ids = group_nodes[1:] + res, _ = self.utility_wrap.get_query_segment_info(self.c_wrap.name) + segment_distribution = cf.get_segment_distribution(res) + self.sealed_segment_ids = segment_distribution[self.src_node_id]["sealed"] + + @exception_handler() + def run_task(self): + self.prepare() + res, result = self.load_balance() + return res, result def keep_running(self): - while True: - c_name = self.c_wrap.name - res, _ = self.c_wrap.get_replicas() - # prepare load balance params - # find a group which has multi nodes - group_nodes = [] - for g in res.groups: - if len(g.group_nodes) >= 2: - group_nodes = list(g.group_nodes) - break - src_node_id = group_nodes[0] - dst_node_ids = group_nodes[1:] - res, _ = self.utility_wrap.get_query_segment_info(c_name) - segment_distribution = cf.get_segment_distribution(res) - sealed_segment_ids = segment_distribution[src_node_id]["sealed"] - # load balance - t0 = time.time() - _, result = self.utility_wrap.load_balance( - c_name, src_node_id, dst_node_ids, sealed_segment_ids - ) - t1 = time.time() - # get segments distribution after load balance - time.sleep(3) - res, _ = self.utility_wrap.get_query_segment_info(c_name) - segment_distribution = cf.get_segment_distribution(res) - sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id][ - "sealed" - ] - check_1 = ( - len( - set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce) - ) - == 0 - ) - des_sealed_segment_ids = [] - for des_node_id in dst_node_ids: - des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"] - # assert sealed_segment_ids is subset of des_sealed_segment_ids - check_2 = set(sealed_segment_ids).issubset(set(des_sealed_segment_ids)) - - if result and (check_1 and check_2): - self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.debug( - f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" - ) - else: - self._fail += 1 - sleep(10) + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) class BulkLoadChecker(Checker): """check bulk load operations in a dependent thread""" - def __init__(self, collection_name=None, flush=False): + def __init__(self, collection_name=None, files=[]): if collection_name is None: collection_name = cf.gen_unique_str("BulkLoadChecker_") super().__init__(collection_name=collection_name) self.utility_wrap = ApiUtilityWrapper() self.schema = cf.gen_default_collection_schema() - self.flush = flush - self.files = ["bulk_load_data_source.json"] + self.files = files self.row_based = True self.recheck_failed_task = False self.failed_tasks = [] + self.c_name = None def update(self, files=None, schema=None, row_based=None): if files is not None: @@ -476,69 +557,29 @@ class BulkLoadChecker(Checker): if row_based is not None: self.row_based = row_based - def keep_running(self): - while True: - if self.recheck_failed_task and self.failed_tasks: - c_name = self.failed_tasks.pop(0) - log.info(f"check failed task: {c_name}") - else: - c_name = cf.gen_unique_str("BulkLoadChecker_") - self.c_wrap.init_collection(name=c_name, schema=self.schema) - if self.flush: - t0 = time.time() - pre_entities_num = self.c_wrap.num_entities - tt = time.time() - t0 - log.info(f"flush before bulk load, cost time: {tt:.4f}") - # import data - t0 = time.time() - task_ids, res_1 = self.utility_wrap.bulk_load( - collection_name=c_name, row_based=self.row_based, files=self.files - ) - log.info(f"bulk load task ids:{task_ids}") - completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed( - task_ids=task_ids, timeout=30 - ) - tt = time.time() - t0 - # added_num = sum(res_2[task_id].row_count for task_id in task_ids) - if completed: - self.rsp_times.append(tt) - self.average_time = (tt + self.average_time * self._succ) / ( - self._succ + 1 - ) - self._succ += 1 - log.info( - f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}" - ) - if self.flush: - t0 = time.time() - cur_entities_num = self.c_wrap.num_entities - tt = time.time() - t0 - log.info(f"flush after bulk load, cost time: {tt:.4f}") - else: - self._fail += 1 - # if the task failed, store the failed collection name for further checking after chaos - self.failed_tasks.append(c_name) - log.info( - f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}" - ) - sleep(constants.WAIT_PER_OP / 10) + @trace() + def bulk_load(self): + task_ids, result = self.utility_wrap.bulk_load(collection_name=self.c_name, + row_based=self.row_based, + files=self.files) + completed, result = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30) + return task_ids, completed - -def assert_statistic(checkers, expectations={}): - for k in checkers.keys(): - # expect succ if no expectations - succ_rate = checkers[k].succ_rate() - total = checkers[k].total() - checker_result = k.check_result() - - if expectations.get(k, "") == constants.FAIL: - log.info(f"Expect Fail: {str(k)} {checker_result}") - expect( - succ_rate < 0.49 or total < 2, f"Expect Fail: {str(k)} {checker_result}" - ) + @exception_handler() + def run_task(self): + if self.recheck_failed_task and self.failed_tasks: + self.c_name = self.failed_tasks.pop(0) + log.debug(f"check failed task: {self.c_name}") else: - log.info(f"Expect Succ: {str(k)} {checker_result}") - expect( - succ_rate > 0.90 or total > 2, f"Expect Succ: {str(k)} {checker_result}" - ) + self.c_name = cf.gen_unique_str("BulkLoadChecker_") + self.c_wrap.init_collection(name=self.c_name, schema=self.schema) + # import data + task_ids, completed = self.bulk_load() + if not completed: + self.failed_tasks.append(self.c_name) + return task_ids, completed + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) \ No newline at end of file diff --git a/tests/python_client/chaos/test_load_with_checker.py b/tests/python_client/chaos/test_load_with_checker.py new file mode 100644 index 0000000000..50d11d7c16 --- /dev/null +++ b/tests/python_client/chaos/test_load_with_checker.py @@ -0,0 +1,131 @@ +import threading +import pytest +import json +from time import sleep +from minio import Minio +from pymilvus import connections +from chaos.checker import (CreateChecker, + InsertChecker, + FlushChecker, + SearchChecker, + QueryChecker, + IndexChecker, + DeleteChecker, + CompactChecker, + DropChecker, + LoadBalanceChecker, + BulkLoadChecker, + Op) +from common.cus_resource_opts import CustomResourceOperations as CusResource +from common.milvus_sys import MilvusSys +from utils.util_log import test_log as log +from utils.util_k8s import get_pod_ip_name_pairs, get_milvus_instance_name +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from common import common_func as cf +from chaos import constants + + +class TestChaosBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + expect_delete = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestChaos(TestChaosBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port): + connections.connect("default", host=host, port=port) + + if connections.has_connection("default") is False: + raise Exception("no connections") + self.host = host + self.port = port + self.instance_name = get_milvus_instance_name(constants.CHAOS_NAMESPACE, host) + + @pytest.fixture(scope="function", autouse=True) + def init_health_checkers(self): + c_name = cf.gen_unique_str("Checker_") + checkers = { + # Op.create: CreateChecker(collection_name=c_name), + # Op.insert: InsertChecker(collection_name=c_name), + # Op.flush: FlushChecker(collection_name=c_name), + # Op.query: QueryChecker(collection_name=c_name), + # Op.search: SearchChecker(collection_name=c_name), + # Op.delete: DeleteChecker(collection_name=c_name), + # Op.compact: CompactChecker(collection_name=c_name), + # Op.index: IndexChecker(), + # Op.drop: DropChecker(), + # Op.bulk_load: BulkLoadChecker(), + Op.load_balance: LoadBalanceChecker() + } + self.health_checkers = checkers + self.prepare_bulk_load() + + def prepare_bulk_load(self, nb=30000, row_based=True): + if Op.bulk_load not in self.health_checkers: + log.info("bulk_load checker is not in health checkers, skip prepare bulk load") + return + log.info("bulk_load checker is in health checkers, prepare data firstly") + release_name = self.instance_name + minio_ip_pod_pair = get_pod_ip_name_pairs("chaos-testing", f"release={release_name}, app=minio") + ms = MilvusSys() + minio_ip = list(minio_ip_pod_pair.keys())[0] + minio_port = "9000" + minio_endpoint = f"{minio_ip}:{minio_port}" + bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] + schema = cf.gen_default_collection_schema() + data = cf.gen_default_list_data_for_bulk_load(nb=nb) + fields_name = [field.name for field in schema.fields] + if not row_based: + data_dict = dict(zip(fields_name, data)) + if row_based: + entities = [] + for i in range(nb): + entity_value = [field_values[i] for field_values in data] + entity = dict(zip(fields_name, entity_value)) + entities.append(entity) + data_dict = {"rows": entities} + file_name = "bulk_load_data_source.json" + files = [file_name] + #TODO: npy file type is not supported so far + log.info("generate bulk load file") + with open(file_name, "w") as f: + f.write(json.dumps(data_dict)) + log.info("upload file to minio") + client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False) + client.fput_object(bucket_name, file_name, file_name) + self.health_checkers[Op.bulk_load].update(schema=schema, files=files, row_based=row_based) + log.info("prepare data for bulk load done") + + def teardown(self): + chaos_res = CusResource(kind=self._chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + meta_name = self._chaos_config.get('metadata', None).get('name', None) + chaos_res.delete(meta_name, raise_ex=False) + sleep(2) + log.info(f'Alive threads: {threading.enumerate()}') + + @pytest.mark.tags(CaseLabel.L3) + def test_load_generator(self): + # start the monitor threads to check the milvus ops + log.info("*********************Chaos Test Start**********************") + log.info(connections.get_connection_addr('default')) + cc.start_monitor_threads(self.health_checkers) + + log.info("start checkers") + sleep(30) + for k, v in self.health_checkers.items(): + v.check_result() + sleep(600)