[skip e2e]Refine checker of chaos test (#17925)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2022-06-30 09:00:18 +08:00 committed by GitHub
parent ef6859a4d6
commit d29499d194
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 506 additions and 334 deletions

View File

@ -1,9 +1,9 @@
from enum import Enum
from random import randint
import time
import random
from datetime import datetime
import functools
from time import sleep
from delayed_assert import expect
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common import common_func as cf
@ -12,26 +12,73 @@ from chaos import constants
from common.common_type import CheckTasks
from utils.util_log import test_log as log
from utils.api_request import Error
class Op(Enum):
create = "create"
insert = "insert"
flush = "flush"
index = "index"
search = "search"
query = "query"
delete = "delete"
compact = "compact"
drop = "drop"
load_balance = "load_balance"
bulk_load = "bulk_load"
unknown = "unknown"
create = 'create'
insert = 'insert'
flush = 'flush'
index = 'index'
search = 'search'
query = 'query'
delete = 'delete'
compact = 'compact'
drop = 'drop'
load_balance = 'load_balance'
bulk_load = 'bulk_load'
unknown = 'unknown'
timeout = 20
timeout = 40
enable_traceback = False
DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}'
def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True):
def decorate(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
t0 = time.perf_counter()
res, result = func(self, *args, **kwargs)
elapsed = time.perf_counter() - t0
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
if flag:
collection_name = self.c_wrap.name
operation_name = func.__name__
log_str = f"[{prefix}]" + fmt.format(**locals())
# TODO: add report function in this place, like uploading to influxdb
# it is better a async way to do this, in case of blocking the request processing
log.debug(log_str)
if result:
self.rsp_times.append(elapsed)
self.average_time = (
elapsed + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
else:
self._fail += 1
return res, result
return inner_wrapper
return decorate
def exception_handler():
def wrapper(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
try:
res, result = func(self, *args, **kwargs)
return res, result
except Exception as e:
log_row_length = 300
e_str = str(e)
log_e = e_str[0:log_row_length] + \
'......' if len(e_str) > log_row_length else e_str
log.error(log_e)
return Error(e), False
return inner_wrapper
return wrapper
class Checker:
@ -44,24 +91,21 @@ class Checker:
def __init__(self, collection_name=None, shards_num=2):
self._succ = 0
self._fail = 0
self._keep_running = True
self.rsp_times = []
self.average_time = 0
self.c_wrap = ApiCollectionWrapper()
c_name = collection_name if collection_name is not None else cf.gen_unique_str("Checker_")
self.c_wrap.init_collection(
name=c_name,
schema=cf.gen_default_collection_schema(),
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback,
)
self.c_wrap.insert(
data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback,
)
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
'Checker_')
self.c_wrap.init_collection(name=c_name,
schema=cf.gen_default_collection_schema(),
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback)
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback)
self.initial_entities = self.c_wrap.num_entities # do as a flush
self.c_wrap.release()
def total(self):
return self._succ + self._fail
@ -73,12 +117,18 @@ class Checker:
succ_rate = self.succ_rate()
total = self.total()
rsp_times = self.rsp_times
average_time = 0 if len(rsp_times) == 0 else sum(rsp_times) / len(rsp_times)
average_time = 0 if len(rsp_times) == 0 else sum(
rsp_times) / len(rsp_times)
max_time = 0 if len(rsp_times) == 0 else max(rsp_times)
min_time = 0 if len(rsp_times) == 0 else min(rsp_times)
checkers_result = f"succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
checker_name = self.__class__.__name__
checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
log.info(checkers_result)
return checkers_result
def terminate(self):
self._keep_running = False
def reset(self):
self._succ = 0
self._fail = 0
@ -93,33 +143,29 @@ class SearchChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("SearchChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before search
# do load before search
self.c_wrap.load(replica_number=replica_number)
@trace()
def search(self):
res, result = self.c_wrap.search(
data=cf.gen_vectors(5, ct.default_dim),
anns_field=ct.default_float_vec_field_name,
param={"nprobe": 32},
limit=1,
timeout=timeout,
check_task=CheckTasks.check_nothing
)
return res, result
@exception_handler()
def run_task(self):
res, result = self.search()
return res, result
def keep_running(self):
while True:
search_vec = cf.gen_vectors(5, ct.default_dim)
t0 = time.time()
_, result = self.c_wrap.search(
data=search_vec,
anns_field=ct.default_float_vec_field_name,
param={"nprobe": 32},
limit=1,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
@ -127,11 +173,6 @@ class InsertFlushChecker(Checker):
"""check Insert and flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
if collection_name is None:
if flush:
collection_name = cf.gen_unique_str("FlushChecker_")
else:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
@ -139,23 +180,18 @@ class InsertFlushChecker(Checker):
def keep_running(self):
while True:
t0 = time.time()
_, insert_result = self.c_wrap.insert(
data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
_, insert_result = \
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
t1 = time.time()
if not self._flush:
if insert_result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
log.debug(
f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
log.debug(f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -166,18 +202,78 @@ class InsertFlushChecker(Checker):
t1 = time.time()
if num_entities == (self.initial_entities + constants.DELTA_PER_INS):
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
log.debug(
f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
log.debug(f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
self.initial_entities += constants.DELTA_PER_INS
else:
self._fail += 1
class FlushChecker(Checker):
"""check flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
@trace()
def flush(self):
num_entities = self.c_wrap.num_entities
if num_entities == (self.initial_entities + constants.DELTA_PER_INS):
result = True
self.initial_entities += constants.DELTA_PER_INS
else:
result = False
return num_entities, result
@exception_handler()
def run_task(self):
_, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
res, result = self.flush()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class InsertChecker(Checker):
"""check flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
@trace()
def insert(self):
res, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
@exception_handler()
def run_task(self):
res, result = self.insert()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class CreateChecker(Checker):
"""check create operations in a dependent thread"""
@ -186,30 +282,26 @@ class CreateChecker(Checker):
collection_name = cf.gen_unique_str("CreateChecker_")
super().__init__(collection_name=collection_name)
def keep_running(self):
while True:
t0 = time.time()
_, result = self.c_wrap.init_collection(
name=cf.gen_unique_str("CreateChecker_"),
schema=cf.gen_default_collection_schema(),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}"
)
self.c_wrap.drop(timeout=timeout)
@trace()
def init_collection(self):
res, result = self.c_wrap.init_collection(
name=cf.gen_unique_str("CreateChecker_"),
schema=cf.gen_default_collection_schema(),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
else:
self._fail += 1
@exception_handler()
def run_task(self):
res, result = self.init_collection()
if result:
self.c_wrap.drop(timeout=timeout)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
@ -220,39 +312,33 @@ class IndexChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name)
self.c_wrap.insert(
data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback,
)
log.debug(
f"Index ready entities: {self.c_wrap.num_entities}"
) # do as a flush before indexing
self.c_wrap.insert(data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH),
timeout=timeout, enable_traceback=enable_traceback)
# do as a flush before indexing
log.debug(f"Index ready entities: {self.c_wrap.num_entities}")
@trace()
def create_index(self):
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
@exception_handler()
def run_task(self):
res, result = self.create_index()
if result:
self.c_wrap.drop_index(timeout=timeout)
return res, result
def keep_running(self):
while True:
t0 = time.time()
_, result = self.c_wrap.create_index(
ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str("index_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
self.c_wrap.drop_index(timeout=timeout)
else:
self._fail += 1
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class QueryChecker(Checker):
@ -263,32 +349,53 @@ class QueryChecker(Checker):
collection_name = cf.gen_unique_str("QueryChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before query
self.term_expr = None
@trace()
def query(self):
res, result = self.c_wrap.query(self.term_expr, timeout=timeout,
check_task=CheckTasks.check_nothing)
return res, result
@exception_handler()
def run_task(self):
int_values = []
for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
self.term_expr = f'{ct.default_int64_field_name} in {int_values}'
res, result= self.query()
return res, result
def keep_running(self):
while True:
int_values = []
for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
term_expr = f"{ct.default_int64_field_name} in {int_values}"
t0 = time.time()
_, result = self.c_wrap.query(
term_expr,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class LoadChecker(Checker):
"""check load operations in a dependent thread"""
def __init__(self, collection_name=None, replica_number=1):
if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name)
self.replica_number = replica_number
@trace()
def load(self):
res, result = self.c_wrap.load(replica_number=self.replica_number, timeout=timeout)
return res, result
@exception_handler()
def run_task(self):
res, result = self.load()
if result:
self.c_wrap.release()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
@ -300,30 +407,27 @@ class DeleteChecker(Checker):
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name)
self.c_wrap.load() # load before query
term_expr = f'{ct.default_int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[
ct.default_int64_field_name])
self.ids = [r[ct.default_int64_field_name] for r in res]
self.expr = None
@trace()
def delete(self):
res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout)
return res, result
@exception_handler()
def run_task(self):
delete_ids = self.ids.pop()
self.expr = f'{ct.default_int64_field_name} in {[delete_ids]}'
res, result = self.delete()
return res, result
def keep_running(self):
while True:
term_expr = f"{ct.default_int64_field_name} > 0"
res, _ = self.c_wrap.query(
term_expr, output_fields=[ct.default_int64_field_name]
)
ids = [r[ct.default_int64_field_name] for r in res]
delete_ids = random.sample(ids, 2)
expr = f"{ct.default_int64_field_name} in {delete_ids}"
t0 = time.time()
_, result = self.c_wrap.delete(expr=expr, timeout=timeout)
tt = time.time() - t0
if result:
self.rsp_times.append(tt)
self.average_time = (tt + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
@ -335,28 +439,23 @@ class CompactChecker(Checker):
collection_name = cf.gen_unique_str("CompactChecker_")
super().__init__(collection_name=collection_name)
self.ut = ApiUtilityWrapper()
self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
self.c_wrap.load() # load before compact
@trace()
def compact(self):
res, result = self.c_wrap.compact(timeout=timeout)
self.c_wrap.wait_for_compaction_completed()
self.c_wrap.get_compaction_plans()
return res, result
@exception_handler()
def run_task(self):
res, result = self.compact()
return res, result
def keep_running(self):
while True:
seg_info = self.ut.get_query_segment_info(self.c_wrap.name)
t0 = time.time()
res, result = self.c_wrap.compact(timeout=timeout)
print(f"compact done: res {res}")
self.c_wrap.wait_for_compaction_completed()
self.c_wrap.get_compaction_plans()
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
@ -367,24 +466,25 @@ class DropChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name)
# self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
@trace()
def drop(self):
res, result = self.c_wrap.drop()
return res, result
def run_task(self):
res, result = self.drop()
return res, result
def keep_running(self):
while True:
t0 = time.time()
_, result = self.c_wrap.drop()
t1 = time.time()
while self._keep_running:
res, result = self.run_task()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
self.c_wrap.init_collection(
name=cf.gen_unique_str("CreateChecker_"),
schema=cf.gen_default_collection_schema(),
timeout=timeout,
check_task=CheckTasks.check_nothing)
sleep(constants.WAIT_PER_OP / 10)
@ -396,77 +496,58 @@ class LoadBalanceChecker(Checker):
collection_name = cf.gen_unique_str("LoadBalanceChecker_")
super().__init__(collection_name=collection_name)
self.utility_wrap = ApiUtilityWrapper()
self.c_wrap.load(enable_traceback=enable_traceback)
self.c_wrap.load()
self.sealed_segment_ids = None
self.dst_node_ids = None
self.src_node_id = None
@trace()
def load_balance(self):
res, result = self.utility_wrap.load_balance(
self.c_wrap.name, self.src_node_id, self.dst_node_ids, self.sealed_segment_ids)
return res, result
def prepare(self):
"""prepare load balance params"""
res, _ = self.c_wrap.get_replicas()
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
break
self.src_node_id = group_nodes[0]
self.dst_node_ids = group_nodes[1:]
res, _ = self.utility_wrap.get_query_segment_info(self.c_wrap.name)
segment_distribution = cf.get_segment_distribution(res)
self.sealed_segment_ids = segment_distribution[self.src_node_id]["sealed"]
@exception_handler()
def run_task(self):
self.prepare()
res, result = self.load_balance()
return res, result
def keep_running(self):
while True:
c_name = self.c_wrap.name
res, _ = self.c_wrap.get_replicas()
# prepare load balance params
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
break
src_node_id = group_nodes[0]
dst_node_ids = group_nodes[1:]
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
t0 = time.time()
_, result = self.utility_wrap.load_balance(
c_name, src_node_id, dst_node_ids, sealed_segment_ids
)
t1 = time.time()
# get segments distribution after load balance
time.sleep(3)
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id][
"sealed"
]
check_1 = (
len(
set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce)
)
== 0
)
des_sealed_segment_ids = []
for des_node_id in dst_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
# assert sealed_segment_ids is subset of des_sealed_segment_ids
check_2 = set(sealed_segment_ids).issubset(set(des_sealed_segment_ids))
if result and (check_1 and check_2):
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(
f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(10)
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class BulkLoadChecker(Checker):
"""check bulk load operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False):
def __init__(self, collection_name=None, files=[]):
if collection_name is None:
collection_name = cf.gen_unique_str("BulkLoadChecker_")
super().__init__(collection_name=collection_name)
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.flush = flush
self.files = ["bulk_load_data_source.json"]
self.files = files
self.row_based = True
self.recheck_failed_task = False
self.failed_tasks = []
self.c_name = None
def update(self, files=None, schema=None, row_based=None):
if files is not None:
@ -476,69 +557,29 @@ class BulkLoadChecker(Checker):
if row_based is not None:
self.row_based = row_based
def keep_running(self):
while True:
if self.recheck_failed_task and self.failed_tasks:
c_name = self.failed_tasks.pop(0)
log.info(f"check failed task: {c_name}")
else:
c_name = cf.gen_unique_str("BulkLoadChecker_")
self.c_wrap.init_collection(name=c_name, schema=self.schema)
if self.flush:
t0 = time.time()
pre_entities_num = self.c_wrap.num_entities
tt = time.time() - t0
log.info(f"flush before bulk load, cost time: {tt:.4f}")
# import data
t0 = time.time()
task_ids, res_1 = self.utility_wrap.bulk_load(
collection_name=c_name, row_based=self.row_based, files=self.files
)
log.info(f"bulk load task ids:{task_ids}")
completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(
task_ids=task_ids, timeout=30
)
tt = time.time() - t0
# added_num = sum(res_2[task_id].row_count for task_id in task_ids)
if completed:
self.rsp_times.append(tt)
self.average_time = (tt + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.info(
f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}"
)
if self.flush:
t0 = time.time()
cur_entities_num = self.c_wrap.num_entities
tt = time.time() - t0
log.info(f"flush after bulk load, cost time: {tt:.4f}")
else:
self._fail += 1
# if the task failed, store the failed collection name for further checking after chaos
self.failed_tasks.append(c_name)
log.info(
f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}"
)
sleep(constants.WAIT_PER_OP / 10)
@trace()
def bulk_load(self):
task_ids, result = self.utility_wrap.bulk_load(collection_name=self.c_name,
row_based=self.row_based,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30)
return task_ids, completed
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
checker_result = k.check_result()
if expectations.get(k, "") == constants.FAIL:
log.info(f"Expect Fail: {str(k)} {checker_result}")
expect(
succ_rate < 0.49 or total < 2, f"Expect Fail: {str(k)} {checker_result}"
)
@exception_handler()
def run_task(self):
if self.recheck_failed_task and self.failed_tasks:
self.c_name = self.failed_tasks.pop(0)
log.debug(f"check failed task: {self.c_name}")
else:
log.info(f"Expect Succ: {str(k)} {checker_result}")
expect(
succ_rate > 0.90 or total > 2, f"Expect Succ: {str(k)} {checker_result}"
)
self.c_name = cf.gen_unique_str("BulkLoadChecker_")
self.c_wrap.init_collection(name=self.c_name, schema=self.schema)
# import data
task_ids, completed = self.bulk_load()
if not completed:
self.failed_tasks.append(self.c_name)
return task_ids, completed
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)

View File

@ -0,0 +1,131 @@
import threading
import pytest
import json
from time import sleep
from minio import Minio
from pymilvus import connections
from chaos.checker import (CreateChecker,
InsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
DeleteChecker,
CompactChecker,
DropChecker,
LoadBalanceChecker,
BulkLoadChecker,
Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
from utils.util_k8s import get_pod_ip_name_pairs, get_milvus_instance_name
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common import common_func as cf
from chaos import constants
class TestChaosBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
expect_delete = constants.SUCC
host = '127.0.0.1'
port = 19530
_chaos_config = None
health_checkers = {}
class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port):
connections.connect("default", host=host, port=port)
if connections.has_connection("default") is False:
raise Exception("no connections")
self.host = host
self.port = port
self.instance_name = get_milvus_instance_name(constants.CHAOS_NAMESPACE, host)
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self):
c_name = cf.gen_unique_str("Checker_")
checkers = {
# Op.create: CreateChecker(collection_name=c_name),
# Op.insert: InsertChecker(collection_name=c_name),
# Op.flush: FlushChecker(collection_name=c_name),
# Op.query: QueryChecker(collection_name=c_name),
# Op.search: SearchChecker(collection_name=c_name),
# Op.delete: DeleteChecker(collection_name=c_name),
# Op.compact: CompactChecker(collection_name=c_name),
# Op.index: IndexChecker(),
# Op.drop: DropChecker(),
# Op.bulk_load: BulkLoadChecker(),
Op.load_balance: LoadBalanceChecker()
}
self.health_checkers = checkers
self.prepare_bulk_load()
def prepare_bulk_load(self, nb=30000, row_based=True):
if Op.bulk_load not in self.health_checkers:
log.info("bulk_load checker is not in health checkers, skip prepare bulk load")
return
log.info("bulk_load checker is in health checkers, prepare data firstly")
release_name = self.instance_name
minio_ip_pod_pair = get_pod_ip_name_pairs("chaos-testing", f"release={release_name}, app=minio")
ms = MilvusSys()
minio_ip = list(minio_ip_pod_pair.keys())[0]
minio_port = "9000"
minio_endpoint = f"{minio_ip}:{minio_port}"
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
schema = cf.gen_default_collection_schema()
data = cf.gen_default_list_data_for_bulk_load(nb=nb)
fields_name = [field.name for field in schema.fields]
if not row_based:
data_dict = dict(zip(fields_name, data))
if row_based:
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "bulk_load_data_source.json"
files = [file_name]
#TODO: npy file type is not supported so far
log.info("generate bulk load file")
with open(file_name, "w") as f:
f.write(json.dumps(data_dict))
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
self.health_checkers[Op.bulk_load].update(schema=schema, files=files, row_based=row_based)
log.info("prepare data for bulk load done")
def teardown(self):
chaos_res = CusResource(kind=self._chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
meta_name = self._chaos_config.get('metadata', None).get('name', None)
chaos_res.delete(meta_name, raise_ex=False)
sleep(2)
log.info(f'Alive threads: {threading.enumerate()}')
@pytest.mark.tags(CaseLabel.L3)
def test_load_generator(self):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
log.info(connections.get_connection_addr('default'))
cc.start_monitor_threads(self.health_checkers)
log.info("start checkers")
sleep(30)
for k, v in self.health_checkers.items():
v.check_result()
sleep(600)