mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
[test]Add RPO and RTO metric for rolling update test (#25612)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
f2b14b43de
commit
b70da0859a
@ -40,6 +40,7 @@ def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True):
|
||||
@functools.wraps(func)
|
||||
def inner_wrapper(self, *args, **kwargs):
|
||||
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
start_time_ts = time.time()
|
||||
t0 = time.perf_counter()
|
||||
res, result = func(self, *args, **kwargs)
|
||||
elapsed = time.perf_counter() - t0
|
||||
@ -56,12 +57,13 @@ def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True):
|
||||
self.average_time = (
|
||||
elapsed + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
# add first success record if there is no success record before
|
||||
if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \
|
||||
self._succ + self._fail == self.fail_records[-1][1] + 1:
|
||||
self.fail_records.append(("success", self._succ + self._fail, start_time))
|
||||
self.fail_records.append(("success", self._succ + self._fail, start_time, start_time_ts))
|
||||
else:
|
||||
self._fail += 1
|
||||
self.fail_records.append(("failure", self._succ + self._fail, start_time))
|
||||
self.fail_records.append(("failure", self._succ + self._fail, start_time, start_time_ts))
|
||||
return res, result
|
||||
return inner_wrapper
|
||||
return decorate
|
||||
@ -93,6 +95,7 @@ class Checker:
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim):
|
||||
self.recovery_time = 0
|
||||
self._succ = 0
|
||||
self._fail = 0
|
||||
self.fail_records = []
|
||||
@ -139,12 +142,28 @@ class Checker:
|
||||
self._keep_running = False
|
||||
self.reset()
|
||||
|
||||
def pause(self):
|
||||
self._keep_running = False
|
||||
time.sleep(10)
|
||||
|
||||
def reset(self):
|
||||
self._succ = 0
|
||||
self._fail = 0
|
||||
self.rsp_times = []
|
||||
self.average_time = 0
|
||||
|
||||
def get_rto(self):
|
||||
if len(self.fail_records) == 0:
|
||||
return 0
|
||||
end = self.fail_records[-1][3]
|
||||
start = self.fail_records[0][3]
|
||||
recovery_time = end - start # second
|
||||
self.recovery_time = recovery_time
|
||||
checker_name = self.__class__.__name__
|
||||
log.info(f"{checker_name} recovery time is {self.recovery_time}, start at {self.fail_records[0][2]}, "
|
||||
f"end at {self.fail_records[-1][2]}")
|
||||
return recovery_time
|
||||
|
||||
|
||||
class SearchChecker(Checker):
|
||||
"""check search operations in a dependent thread"""
|
||||
@ -269,13 +288,28 @@ class InsertChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num)
|
||||
self._flush = flush
|
||||
self.initial_entities = self.c_wrap.num_entities
|
||||
self.inserted_data = []
|
||||
self.scale = 1*10**6
|
||||
self.start_time_stamp = int(time.time()*self.scale) # us
|
||||
self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp}'
|
||||
|
||||
@trace()
|
||||
def insert(self):
|
||||
res, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
|
||||
data = cf.gen_default_list_data(nb=constants.DELTA_PER_INS)
|
||||
ts_data = []
|
||||
for i in range(constants.DELTA_PER_INS):
|
||||
time.sleep(0.001)
|
||||
offset_ts = int(time.time()*self.scale)
|
||||
ts_data.append(offset_ts)
|
||||
|
||||
data[0] = ts_data # set timestamp (ms) as int64
|
||||
log.debug(f"insert data: {ts_data}")
|
||||
res, result = self.c_wrap.insert(data=data,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
if result:
|
||||
self.inserted_data.extend(ts_data)
|
||||
return res, result
|
||||
|
||||
@exception_handler()
|
||||
@ -288,6 +322,35 @@ class InsertChecker(Checker):
|
||||
self.run_task()
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
||||
def verify_data_completeness(self):
|
||||
try:
|
||||
self.c_wrap.create_index(ct.default_float_vec_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
except Exception as e:
|
||||
log.error(f"create index error: {e}")
|
||||
self.c_wrap.load()
|
||||
end_time_stamp = int(time.time()*self.scale)
|
||||
self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp} and ' \
|
||||
f'{ct.default_int64_field_name} <= {end_time_stamp}'
|
||||
data_in_client = []
|
||||
for d in self.inserted_data:
|
||||
if self.start_time_stamp <= d <= end_time_stamp:
|
||||
data_in_client.append(d)
|
||||
res, result = self.c_wrap.query(self.term_expr, timeout=timeout,
|
||||
output_fields=[f'{ct.default_int64_field_name}'],
|
||||
limit=len(data_in_client) * 2,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
|
||||
data_in_server = []
|
||||
for r in res:
|
||||
d = r[f"{ct.default_int64_field_name}"]
|
||||
data_in_server.append(d)
|
||||
assert set(data_in_server) == set(data_in_client)
|
||||
|
||||
|
||||
class CreateChecker(Checker):
|
||||
"""check create operations in a dependent thread"""
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
import pytest
|
||||
from time import sleep
|
||||
from pymilvus import connections
|
||||
from chaos.checker import (CreateChecker,
|
||||
InsertChecker,
|
||||
FlushChecker,
|
||||
SearchChecker,
|
||||
QueryChecker,
|
||||
IndexChecker,
|
||||
DeleteChecker,
|
||||
DropChecker,
|
||||
Op)
|
||||
from utils.util_log import test_log as log
|
||||
from chaos import chaos_commons as cc
|
||||
from common.common_type import CaseLabel
|
||||
from chaos.chaos_commons import assert_statistic
|
||||
from chaos import constants
|
||||
from delayed_assert import assert_expectations
|
||||
|
||||
|
||||
class TestBase:
|
||||
expect_create = constants.SUCC
|
||||
expect_insert = constants.SUCC
|
||||
expect_flush = constants.SUCC
|
||||
expect_index = constants.SUCC
|
||||
expect_search = constants.SUCC
|
||||
expect_query = constants.SUCC
|
||||
host = '127.0.0.1'
|
||||
port = 19530
|
||||
_chaos_config = None
|
||||
health_checkers = {}
|
||||
|
||||
|
||||
class TestOperations(TestBase):
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def connection(self, host, port, user, password):
|
||||
if user and password:
|
||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
||||
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
|
||||
else:
|
||||
connections.connect('default', host=host, port=port)
|
||||
if connections.has_connection("default") is False:
|
||||
raise Exception("no connections")
|
||||
log.info("connect to milvus successfully")
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.user = user
|
||||
self.password = password
|
||||
|
||||
def init_health_checkers(self, collection_name=None):
|
||||
c_name = collection_name
|
||||
checkers = {
|
||||
Op.create: CreateChecker(collection_name=c_name),
|
||||
Op.insert: InsertChecker(collection_name=c_name),
|
||||
Op.flush: FlushChecker(collection_name=c_name),
|
||||
Op.index: IndexChecker(collection_name=c_name),
|
||||
Op.search: SearchChecker(collection_name=c_name),
|
||||
Op.query: QueryChecker(collection_name=c_name),
|
||||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
Op.drop: DropChecker(collection_name=c_name)
|
||||
}
|
||||
self.health_checkers = checkers
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_operations(self, request_duration, is_check):
|
||||
# start the monitor threads to check the milvus ops
|
||||
log.info("*********************Test Start**********************")
|
||||
log.info(connections.get_connection_addr('default'))
|
||||
c_name = None
|
||||
self.init_health_checkers(collection_name=c_name)
|
||||
cc.start_monitor_threads(self.health_checkers)
|
||||
log.info("*********************Load Start**********************")
|
||||
# wait request_duration
|
||||
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
|
||||
if request_duration[-1] == "+":
|
||||
request_duration = request_duration[:-1]
|
||||
request_duration = eval(request_duration)
|
||||
for i in range(10):
|
||||
sleep(request_duration // 10)
|
||||
for k, v in self.health_checkers.items():
|
||||
v.check_result()
|
||||
for k, v in self.health_checkers.items():
|
||||
v.pause()
|
||||
for k, v in self.health_checkers.items():
|
||||
v.check_result()
|
||||
log.info(f"{k} rto: {v.get_rto()}")
|
||||
if is_check:
|
||||
assert_statistic(self.health_checkers, succ_rate_threshold=0.98)
|
||||
assert_expectations()
|
||||
# get each checker's rto
|
||||
for k, v in self.health_checkers.items():
|
||||
log.info(f"{k} rto: {v.get_rto()}")
|
||||
rto = v.get_rto()
|
||||
assert rto < 30, f"expect 30s but get {rto}s" # rto should be less than 30s
|
||||
|
||||
if Op.insert in self.health_checkers:
|
||||
# verify the no insert data loss
|
||||
log.info("*********************Verify Data Completeness**********************")
|
||||
self.health_checkers[Op.insert].verify_data_completeness()
|
||||
|
||||
log.info("*********************Chaos Test Completed**********************")
|
||||
@ -12,6 +12,12 @@ class Error:
|
||||
self.code = getattr(error, 'code', -1)
|
||||
self.message = getattr(error, 'message', str(error))
|
||||
|
||||
def __str__(self):
|
||||
return f"Error(code={self.code}, message={self.message})"
|
||||
|
||||
def __repr__(self):
|
||||
return f"Error(code={self.code}, message={self.message})"
|
||||
|
||||
|
||||
log_row_length = 300
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user