[test]Update bulk insert test case and skip calc_distance testcases (#19855)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2022-10-18 17:21:26 +08:00 committed by GitHub
parent b3f6b67977
commit 05fbbf2e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 2735 additions and 3530 deletions

View File

@ -7,7 +7,7 @@
.idea
*.html
*.hdf5
*.npy
.python-version
__pycache__
.vscode

View File

@ -6,10 +6,9 @@ import sys
sys.path.append("..")
from check.func_check import ResponseChecker
from utils.api_request import api_request
from common.common_type import BulkLoadStates
from pymilvus import BulkInsertState
from pymilvus.orm.role import Role
from utils.util_log import test_log as log
TIMEOUT = 20
@ -19,68 +18,169 @@ class ApiUtilityWrapper:
ut = utility
role = None
def bulk_load(self, collection_name, partition_name="", row_based=True, files="", timeout=None,
def bulk_insert(self, collection_name, is_row_based=True, files="", partition_name=None, timeout=None,
using="default", check_task=None, check_items=None, **kwargs):
working_tasks = self.get_bulk_insert_working_list()
log.info(f"before bulk load, there are {len(working_tasks)} working tasks")
log.info(f"files to load: {files}")
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.bulk_load, collection_name, partition_name, row_based,
files, timeout, using], **kwargs)
res, is_succ = api_request([self.ut.bulk_insert, collection_name, is_row_based,
files, partition_name, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, using=using).run()
time.sleep(1)
working_tasks = self.get_bulk_insert_working_list()
log.info(f"after bulk load, there are {len(working_tasks)} working tasks")
return res, check_result
def get_bulk_load_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None, **kwargs):
def get_bulk_insert_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.get_bulk_load_state, task_id, timeout, using], **kwargs)
res, is_succ = api_request([self.ut.get_bulk_insert_state, task_id, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
task_id=task_id, using=using).run()
return res, check_result
def wait_for_bulk_load_tasks_completed(self, task_ids, target_state=BulkLoadStates.BulkLoadPersisted,
def list_bulk_insert_tasks(self, limit=0, collection_name=None, timeout=None, using="default", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.list_bulk_insert_tasks, limit, collection_name, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
limit=limit, collection_name=collection_name, using=using).run()
return res, check_result
def get_bulk_insert_pending_list(self):
tasks = {}
for task in self.ut.list_bulk_insert_tasks():
if task.state == BulkInsertState.ImportPending:
tasks[task.task_id] = task
return tasks
def get_bulk_insert_working_list(self):
tasks = {}
for task in self.ut.list_bulk_insert_tasks():
if task.state in [BulkInsertState.ImportStarted]:
tasks[task.task_id] = task
return tasks
def list_all_bulk_insert_tasks(self, limit=0):
tasks, _ = self.list_bulk_insert_tasks(limit=limit)
pending = 0
started = 0
persisted = 0
completed = 0
failed = 0
failed_and_cleaned = 0
unknown = 0
for task in tasks:
print(task)
if task.state == BulkInsertState.ImportPending:
pending = pending + 1
elif task.state == BulkInsertState.ImportStarted:
started = started + 1
elif task.state == BulkInsertState.ImportPersisted:
persisted = persisted + 1
elif task.state == BulkInsertState.ImportCompleted:
completed = completed + 1
elif task.state == BulkInsertState.ImportFailed:
failed = failed + 1
elif task.state == BulkInsertState.ImportFailedAndCleaned:
failed_and_cleaned = failed_and_cleaned + 1
else:
unknown = unknown + 1
log.info("There are", len(tasks), "bulkload tasks.", pending, "pending,", started, "started,", persisted,
"persisted,", completed, "completed,", failed, "failed", failed_and_cleaned, "failed_and_cleaned",
unknown, "unknown")
def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted,
timeout=None, using="default", **kwargs):
start = time.time()
successes = {}
fails = {}
tasks_state_distribution = {
"success": set(),
"failed": set(),
"in_progress": set()
}
tasks_state = {}
if timeout is not None:
task_timeout = timeout / len(task_ids)
task_timeout = timeout
else:
task_timeout = TIMEOUT
while (len(successes) + len(fails)) < len(task_ids):
in_progress = {}
time.sleep(0.1)
start = time.time()
end = time.time()
log.info(f"wait bulk load timeout is {task_timeout}")
pending_tasks = self.get_bulk_insert_pending_list()
log.info(f"before waiting, there are {len(pending_tasks)} pending tasks")
while len(tasks_state_distribution["success"])+len(tasks_state_distribution["failed"]) < len(task_ids) and end-start <= task_timeout:
time.sleep(2)
for task_id in task_ids:
if successes.get(task_id, None) is not None or fails.get(task_id, None) is not None:
if task_id in tasks_state_distribution["success"] or task_id in tasks_state_distribution["failed"]:
continue
else:
state, _ = self.get_bulk_load_state(task_id, task_timeout, using, **kwargs)
if target_state == BulkLoadStates.BulkLoadDataQueryable:
if state.data_queryable is True:
successes[task_id] = True
else:
in_progress[task_id] = False
elif target_state == BulkLoadStates.BulkLoadDataIndexed:
if state.data_indexed is True:
successes[task_id] = True
else:
in_progress[task_id] = False
else:
if state.state_name == target_state:
successes[task_id] = state
elif state.state_name == BulkLoadStates.BulkLoadFailed:
fails[task_id] = state
else:
in_progress[task_id] = state
end = time.time()
if timeout is not None:
if end - start > timeout:
in_progress.update(fails)
in_progress.update(successes)
return False, in_progress
state, _ = self.get_bulk_insert_state(task_id, task_timeout, using, **kwargs)
tasks_state[task_id] = state
if len(fails) == 0:
return True, successes
if target_state == BulkInsertState.ImportPersisted:
if state.state in [BulkInsertState.ImportPersisted, BulkInsertState.ImportCompleted]:
if task_id in tasks_state_distribution["in_progress"]:
tasks_state_distribution["in_progress"].remove(task_id)
tasks_state_distribution["success"].add(task_id)
elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted]:
tasks_state_distribution["in_progress"].add(task_id)
else:
tasks_state_distribution["failed"].add(task_id)
if target_state == BulkInsertState.ImportCompleted:
if state.state in [BulkInsertState.ImportCompleted]:
if task_id in tasks_state_distribution["in_progress"]:
tasks_state_distribution["in_progress"].remove(task_id)
tasks_state_distribution["success"].add(task_id)
elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted, BulkInsertState.ImportPersisted]:
tasks_state_distribution["in_progress"].add(task_id)
else:
tasks_state_distribution["failed"].add(task_id)
end = time.time()
pending_tasks = self.get_bulk_insert_pending_list()
log.info(f"after waiting, there are {len(pending_tasks)} pending tasks")
log.info(f"task state distribution: {tasks_state_distribution}")
log.debug(tasks_state)
if len(tasks_state_distribution["success"]) == len(task_ids):
log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}")
return True, tasks_state
else:
fails.update(successes)
return False, fails
log.info(f"wait for bulk load tasks completed failed, cost time: {end-start}")
return False, tasks_state
def wait_all_pending_tasks_finished(self):
task_states_map = {}
all_tasks, _ = self.list_bulk_insert_tasks()
# log.info(f"all tasks: {all_tasks}")
for task in all_tasks:
if task.state in [BulkInsertState.ImportStarted, BulkInsertState.ImportPersisted]:
task_states_map[task.task_id] = task.state
log.info(f"current tasks states: {task_states_map}")
pending_tasks = self.get_bulk_insert_pending_list()
working_tasks = self.get_bulk_insert_working_list()
log.info(f"in the start, there are {len(working_tasks)} working tasks, {working_tasks} {len(pending_tasks)} pending tasks, {pending_tasks}")
time_cnt = 0
pending_task_ids = set()
while len(pending_tasks) > 0:
time.sleep(5)
time_cnt += 5
pending_tasks = self.get_bulk_insert_pending_list()
working_tasks = self.get_bulk_insert_working_list()
cur_pending_task_ids = []
for task_id in pending_tasks.keys():
cur_pending_task_ids.append(task_id)
pending_task_ids.add(task_id)
log.info(f"after {time_cnt}, there are {len(working_tasks)} working tasks, {len(pending_tasks)} pending tasks")
log.debug(f"total pending tasks: {pending_task_ids} current pending tasks: {cur_pending_task_ids}")
log.info(f"after {time_cnt}, all pending tasks are finished")
all_tasks, _ = self.list_bulk_insert_tasks()
for task in all_tasks:
if task.task_id in pending_task_ids:
log.info(f"task {task.task_id} state transfer from pending to {task.state_name}")
def get_query_segment_info(self, collection_name, timeout=None, using="default", check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout

View File

@ -1,16 +1,14 @@
import time
import os
import pathlib
import numpy as np
import random
from sklearn import preprocessing
from common.common_func import gen_unique_str
from minio_comm import copy_files_to_minio
from utils.util_log import test_log as log
# TODO: remove hardcode with input configurations
minio = "minio_address:port" # minio service and port
bucket_name = "milvus-bulk-load" # bucket name of milvus is using
data_source = "/tmp/bulk_load_data"
data_source = "/tmp/bulk_insert_data"
BINARY = "binary"
FLOAT = "float"
@ -23,6 +21,7 @@ class DataField:
string_field = "string_scalar"
bool_field = "bool_scalar"
float_field = "float_scalar"
double_field = "double_scalar"
class DataErrorType:
@ -35,8 +34,8 @@ class DataErrorType:
str_on_vector_field = "str_on_vector_field"
def gen_file_prefix(row_based=True, auto_id=True, prefix=""):
if row_based:
def gen_file_prefix(is_row_based=True, auto_id=True, prefix=""):
if is_row_based:
if auto_id:
return f"{prefix}_row_auto"
else:
@ -244,8 +243,8 @@ def gen_column_base_json_file(col_file, str_pk, data_fields, float_vect,
f.write("\n")
def gen_vectors_in_numpy_file(dir, float_vector, rows, dim, force=False):
file_name = f"{DataField.vec_field}.npy"
def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, force=False):
file_name = f"{data_field}.npy"
file = f'{dir}/{file_name}'
if not os.path.exists(file) or force:
@ -257,6 +256,23 @@ def gen_vectors_in_numpy_file(dir, float_vector, rows, dim, force=False):
else:
vectors = gen_binary_vectors(rows, (dim // 8))
arr = np.array(vectors)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
# non vector columns
data = []
if rows > 0:
data = [gen_unique_str(str(i)) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
@ -267,19 +283,24 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
if not os.path.exists(file) or force:
# non vector columns
data = []
# arr = np.array([])
if rows > 0:
if data_field == DataField.float_field:
data = [random.random() for _ in range(rows)]
data = [np.float32(random.random()) for _ in range(rows)]
elif data_field == DataField.double_field:
data = [np.float64(random.random()) for _ in range(rows)]
elif data_field == DataField.pk_field:
data = [i for i in range(start, start + rows)]
elif data_field == DataField.int_field:
data = [random.randint(-999999, 9999999) for _ in range(rows)]
arr = np.array(data)
np.save(file, arr)
# print(f"file_name: {file_name} data type: {arr.dtype}")
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_file_name(row_based, rows, dim, auto_id, str_pk,
def gen_file_name(is_row_based, rows, dim, auto_id, str_pk,
float_vector, data_fields, file_num, file_type, err_type):
row_suffix = entity_suffix(rows)
field_suffix = ""
@ -297,7 +318,7 @@ def gen_file_name(row_based, rows, dim, auto_id, str_pk,
pk = ""
if str_pk:
pk = "str_pk_"
prefix = gen_file_prefix(row_based=row_based, auto_id=auto_id, prefix=err_type)
prefix = gen_file_prefix(is_row_based=is_row_based, auto_id=auto_id, prefix=err_type)
file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}{file_type}"
return file_name
@ -312,7 +333,7 @@ def gen_subfolder(root, dim, rows, file_num):
return subfolder
def gen_json_files(row_based, rows, dim, auto_id, str_pk,
def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
float_vector, data_fields, file_nums, multi_folder,
file_type, err_type, force, **kwargs):
# gen json files
@ -322,7 +343,7 @@ def gen_json_files(row_based, rows, dim, auto_id, str_pk,
if not auto_id and DataField.pk_field not in data_fields:
data_fields.append(DataField.pk_field)
for i in range(file_nums):
file_name = gen_file_name(row_based=row_based, rows=rows, dim=dim,
file_name = gen_file_name(is_row_based=is_row_based, rows=rows, dim=dim,
auto_id=auto_id, str_pk=str_pk, float_vector=float_vector,
data_fields=data_fields, file_num=i, file_type=file_type, err_type=err_type)
file = f"{data_source}/{file_name}"
@ -330,7 +351,7 @@ def gen_json_files(row_based, rows, dim, auto_id, str_pk,
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i)
file = f"{data_source}/{subfolder}/{file_name}"
if not os.path.exists(file) or force:
if row_based:
if is_row_based:
gen_row_based_json_file(row_file=file, str_pk=str_pk, float_vect=float_vector,
data_fields=data_fields, rows=rows, dim=dim,
start_uid=start_uid, err_type=err_type, **kwargs)
@ -346,7 +367,7 @@ def gen_json_files(row_based, rows, dim, auto_id, str_pk,
return files
def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, force=False):
def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False):
# gen numpy files
files = []
start_uid = 0
@ -354,8 +375,10 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, force=False
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
if data_field == DataField.vec_field:
file_name = gen_vectors_in_numpy_file(dir=data_source, float_vector=float_vector,
file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector,
rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field,
rows=rows, force=force)
@ -365,8 +388,8 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, force=False
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i)
dir = f"{data_source}/{subfolder}"
for data_field in data_fields:
if data_field == DataField.vec_field:
file_name = gen_vectors_in_numpy_file(dir=dir, float_vector=float_vector, rows=rows, dim=dim, force=force)
if DataField.vec_field in data_field:
file_name = gen_vectors_in_numpy_file(dir=dir, data_field=data_field, float_vector=float_vector, rows=rows, dim=dim, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=dir, data_field=data_field, rows=rows, start=start_uid, force=force)
files.append(f"{subfolder}/{file_name}")
@ -374,15 +397,21 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, force=False
return files
def prepare_bulk_load_json_files(row_based=True, rows=100, dim=128,
def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket", is_row_based=True, rows=100, dim=128,
auto_id=True, str_pk=False, float_vector=True,
data_fields=[], file_nums=1, multi_folder=False,
file_type=".json", err_type="", force=False, **kwargs):
"""
Generate files based on the params in json format and copy them to minio
:param row_based: indicate the file(s) to be generated is row based or not
:type row_based: boolean
:param minio_endpoint: the minio_endpoint of minio
:type minio_endpoint: str
:param bucket_name: the bucket name of Milvus
:type bucket_name: str
:param is_row_based: indicate the file(s) to be generated is row based or not
:type is_row_based: boolean
:param rows: the number entities to be generated in the file(s)
:type rows: int
@ -427,16 +456,16 @@ def prepare_bulk_load_json_files(row_based=True, rows=100, dim=128,
:return list
file names list
"""
files = gen_json_files(row_based=row_based, rows=rows, dim=dim,
files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim,
auto_id=auto_id, str_pk=str_pk, float_vector=float_vector,
data_fields=data_fields, file_nums=file_nums, multi_folder=multi_folder,
file_type=file_type, err_type=err_type, force=force, **kwargs)
copy_files_to_minio(host=minio, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files
def prepare_bulk_load_numpy_files(rows, dim, data_fields=[DataField.vec_field],
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, data_fields=[DataField.vec_field],
float_vector=True, file_nums=1, force=False):
"""
Generate column based files based on params in numpy format and copy them to the minio
@ -471,6 +500,6 @@ def prepare_bulk_load_numpy_files(rows, dim, data_fields=[DataField.vec_field],
data_fields=data_fields,
file_nums=file_nums, force=force)
copy_files_to_minio(host=minio, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
import os
import threading
import time
import glob
from chaos import constants
from yaml import full_load
@ -68,8 +69,21 @@ def get_chaos_yamls():
return glob.glob(constants.TESTS_CONFIG_LOCATION + constants.ALL_CHAOS_YAMLS)
def reconnect(connections, alias='default'):
def reconnect(connections, alias='default', timeout=360):
"""trying to connect by connection alias"""
is_connected = False
start = time.time()
end = time.time()
while not is_connected or end-start < timeout:
try:
connections.connect(alias)
is_connected = True
except Exception as e:
log.debug(f"fail to connect, error: {str(e)}")
time.sleep(10)
end = time.time()
else:
log.info(f"failed to reconnect after {timeout} seconds")
return connections.connect(alias)

View File

@ -26,7 +26,7 @@ class Op(Enum):
compact = 'compact'
drop = 'drop'
load_balance = 'load_balance'
bulk_load = 'bulk_load'
bulk_insert = 'bulk_insert'
unknown = 'unknown'
@ -540,7 +540,7 @@ class LoadBalanceChecker(Checker):
sleep(constants.WAIT_PER_OP / 10)
class BulkLoadChecker(Checker):
class BulkInsertChecker(Checker):
"""check bulk load operations in a dependent thread"""
def __init__(self, collection_name=None, files=[]):
@ -550,25 +550,25 @@ class BulkLoadChecker(Checker):
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.files = files
self.row_based = True
self.is_row_based = True
self.recheck_failed_task = False
self.failed_tasks = []
self.c_name = None
def update(self, files=None, schema=None, row_based=None):
def update(self, files=None, schema=None, is_row_based=None):
if files is not None:
self.files = files
if schema is not None:
self.schema = schema
if row_based is not None:
self.row_based = row_based
if is_row_based is not None:
self.is_row_based = is_row_based
@trace()
def bulk_load(self):
task_ids, result = self.utility_wrap.bulk_load(collection_name=self.c_name,
row_based=self.row_based,
def bulk_insert(self):
task_ids, result = self.utility_wrap.bulk_insert(collection_name=self.c_name,
is_row_based=self.is_row_based,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=task_ids, timeout=60)
return task_ids, completed
@exception_handler()
@ -580,7 +580,7 @@ class BulkLoadChecker(Checker):
self.c_name = cf.gen_unique_str("BulkLoadChecker_")
self.c_wrap.init_collection(name=self.c_name, schema=self.schema)
# import data
task_ids, completed = self.bulk_load()
task_ids, completed = self.bulk_insert()
if not completed:
self.failed_tasks.append(self.c_name)
return task_ids, completed

View File

@ -9,7 +9,7 @@ from time import sleep
from pathlib import Path
from minio import Minio
from pymilvus import connections
from chaos.checker import (InsertFlushChecker, SearchChecker, QueryChecker, BulkLoadChecker, Op)
from chaos.checker import (InsertFlushChecker, SearchChecker, QueryChecker, BulkInsertChecker, Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
@ -19,8 +19,8 @@ from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common import common_func as cf
from chaos import constants
# from bulk_load.bulk_load_data import gen_file_name
from bulk_load.minio_comm import copy_files_to_minio
# from bulk_insert.bulk_insert_data import gen_file_name
from bulk_insert.minio_comm import copy_files_to_minio
from delayed_assert import expect, assert_expectations
@ -86,17 +86,17 @@ class TestChaos(TestChaosBase):
checkers = {
# Op.insert: InsertFlushChecker(collection_name=c_name),
# Op.search: SearchChecker(collection_name=c_name, replica_number=2),
Op.bulk_load: BulkLoadChecker()
Op.bulk_insert: BulkLoadChecker()
# Op.query: QueryChecker(collection_name=c_name, replica_number=2)
}
self.health_checkers = checkers
@pytest.fixture(scope="function", autouse=True)
def prepare_bulk_load(self, nb=1000, row_based=True):
if Op.bulk_load not in self.health_checkers:
log.info("bulk_load checker is not in health checkers, skip prepare bulk load")
def prepare_bulk_insert(self, nb=1000, is_row_based=True):
if Op.bulk_insert not in self.health_checkers:
log.info("bulk_insert checker is not in health checkers, skip prepare bulk load")
return
log.info("bulk_load checker is in health checkers, prepare data firstly")
log.info("bulk_insert checker is in health checkers, prepare data firstly")
release_name = self.instance_name
minio_ip_pod_pair = get_pod_ip_name_pairs("chaos-testing", f"release={release_name}, app=minio")
ms = MilvusSys()
@ -105,27 +105,27 @@ class TestChaos(TestChaosBase):
minio_endpoint = f"{minio_ip}:{minio_port}"
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
schema = cf.gen_default_collection_schema()
data = cf.gen_default_list_data_for_bulk_load(nb=nb)
data = cf.gen_default_list_data_for_bulk_insert(nb=nb)
fields_name = [field.name for field in schema.fields]
if not row_based:
if not is_row_based:
data_dict = dict(zip(fields_name, data))
if row_based:
if is_row_based:
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "bulk_load_data_source.json"
file_name = "/tmp/ci_logs/bulk_insert_data_source.json"
files = [file_name]
#TODO: npy file type is not supported so far
log.info("generate bulk load file")
with open(file_name, "w") as f:
f.write(json.dumps(data_dict))
f.write(json.dumps(data_dict, indent=4))
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
self.health_checkers[Op.bulk_load].update(schema=schema, files=files, row_based=row_based)
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files, is_row_based=is_row_based)
log.info("prepare data for bulk load done")
def teardown(self):
@ -139,24 +139,23 @@ class TestChaos(TestChaosBase):
log.info(f'Alive threads: {threading.enumerate()}')
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("target_component", ["minio"]) # "minio", "proxy", "rootcoord", "datacoord", "datanode", "etcd"
@pytest.mark.parametrize("chaos_type", ["pod_kill"]) # "pod_kill", "pod_failure"
def test_bulk_load(self, chaos_type, target_component):
def test_bulk_insert(self, chaos_type, target_component):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
log.info(connections.get_connection_addr('default'))
release_name = self.instance_name
cc.start_monitor_threads(self.health_checkers)
chaos_config = cc.gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/chaos_objects/{chaos_type}/chaos_{target_component}_{chaos_type}.yaml")
chaos_config['metadata']['name'] = f"test-bulk-load-{int(time.time())}"
chaos_config = cc.gen_experiment_config(
f"{str(Path(__file__).absolute().parent)}/chaos_objects/{chaos_type.replace('-', '_')}/chaos_{target_component}_{chaos_type.replace('-', '_')}.yaml")
chaos_config['metadata']['name'] = f"test-{target_component}-{chaos_type.replace('_','-')}-{int(time.time())}"
kind = chaos_config['kind']
meta_name = chaos_config.get('metadata', None).get('name', None)
update_key_value(chaos_config, "release", release_name)
update_key_value(chaos_config, "app.kubernetes.io/instance", release_name)
self._chaos_config = chaos_config # cache the chaos config for tear down
log.info(f"chaos_config: {chaos_config}")
# wait 20s
sleep(constants.WAIT_PER_OP * 10)
# wait 120s
sleep(constants.WAIT_PER_OP * 12)
# assert statistic:all ops 100% succ
log.info("******1st assert before chaos: ")
assert_statistic(self.health_checkers)
@ -170,15 +169,17 @@ class TestChaos(TestChaosBase):
sleep(constants.WAIT_PER_OP * 10)
# reset counting
cc.reset_counting(self.health_checkers)
# wait 120s
sleep(constants.CHAOS_DURATION)
# wait 240s
sleep(constants.WAIT_PER_OP * 24)
log.info(f'Alive threads: {threading.enumerate()}')
# assert statistic
log.info("******2nd assert after chaos injected: ")
assert_statistic(self.health_checkers,
expectations={
Op.bulk_load: constants.FAIL,
})
for op, checker in self.health_checkers.items():
checker.check_result()
# assert_statistic(self.health_checkers,
# expectations={
# Op.bulk_insert: constants.FAIL,
# })
# delete chaos
chaos_res.delete(meta_name)
log.info("chaos deleted")
@ -191,13 +192,14 @@ class TestChaos(TestChaosBase):
log.info("all pods are ready")
# reconnect if needed
sleep(constants.WAIT_PER_OP * 2)
log.info("reconnect to milvus")
cc.reconnect(connections, alias='default')
# recheck failed tasks in third assert
self.health_checkers[Op.bulk_load].recheck_failed_task = True
self.health_checkers[Op.bulk_insert].recheck_failed_task = True
# reset counting again
cc.reset_counting(self.health_checkers)
# wait 50s (varies by feature)
sleep(constants.WAIT_PER_OP * 10)
# wait 240s (varies by feature)
sleep(constants.WAIT_PER_OP * 24)
# assert statistic: all ops success again
log.info("******3rd assert after chaos deleted: ")
assert_statistic(self.health_checkers)

View File

@ -14,7 +14,7 @@ from chaos.checker import (CreateChecker,
CompactChecker,
DropChecker,
LoadBalanceChecker,
BulkLoadChecker,
BulkInsertChecker,
Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
@ -65,17 +65,17 @@ class TestChaos(TestChaosBase):
# Op.compact: CompactChecker(collection_name=c_name),
# Op.index: IndexChecker(),
# Op.drop: DropChecker(),
# Op.bulk_load: BulkLoadChecker(),
# Op.bulk_insert: BulkInsertChecker(),
Op.load_balance: LoadBalanceChecker()
}
self.health_checkers = checkers
self.prepare_bulk_load()
self.prepare_bulk_insert()
def prepare_bulk_load(self, nb=30000, row_based=True):
if Op.bulk_load not in self.health_checkers:
log.info("bulk_load checker is not in health checkers, skip prepare bulk load")
def prepare_bulk_insert(self, nb=30000, row_based=True):
if Op.bulk_insert not in self.health_checkers:
log.info("bulk_insert checker is not in health checkers, skip prepare bulk insert")
return
log.info("bulk_load checker is in health checkers, prepare data firstly")
log.info("bulk_insert checker is in health checkers, prepare data firstly")
release_name = self.instance_name
minio_ip_pod_pair = get_pod_ip_name_pairs("chaos-testing", f"release={release_name}, app=minio")
ms = MilvusSys()
@ -84,7 +84,7 @@ class TestChaos(TestChaosBase):
minio_endpoint = f"{minio_ip}:{minio_port}"
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
schema = cf.gen_default_collection_schema()
data = cf.gen_default_list_data_for_bulk_load(nb=nb)
data = cf.gen_default_list_data_for_bulk_insert(nb=nb)
fields_name = [field.name for field in schema.fields]
if not row_based:
data_dict = dict(zip(fields_name, data))
@ -95,17 +95,17 @@ class TestChaos(TestChaosBase):
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "bulk_load_data_source.json"
file_name = "bulk_insert_data_source.json"
files = [file_name]
#TODO: npy file type is not supported so far
log.info("generate bulk load file")
log.info("generate bulk insert file")
with open(file_name, "w") as f:
f.write(json.dumps(data_dict))
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
self.health_checkers[Op.bulk_load].update(schema=schema, files=files, row_based=row_based)
log.info("prepare data for bulk load done")
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files, row_based=row_based)
log.info("prepare data for bulk insert done")
def teardown(self):
chaos_res = CusResource(kind=self._chaos_config['kind'],

View File

@ -319,7 +319,7 @@ def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim):
return data
def gen_default_list_data_for_bulk_load(nb=ct.default_nb, dim=ct.default_dim):
def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, dim=ct.default_dim):
int_values = [i for i in range(nb)]
float_values = [float(i) for i in range(nb)]
string_values = [str(i) for i in range(nb)]

View File

@ -1,5 +1,5 @@
import os
import datetime
class LogConfig:
def __init__(self):
@ -16,7 +16,8 @@ class LogConfig:
log_path = os.environ[var]
return str(log_path)
except Exception as e:
log_path = "/tmp/ci_logs/"
# now = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
log_path = f"/tmp/ci_logs"
print("[get_env_variable] failed to get environment variables : %s, use default path : %s" % (str(e), log_path))
return log_path

View File

@ -9,7 +9,7 @@ allure-pytest==2.7.0
pytest-print==0.2.1
pytest-level==0.1.1
pytest-xdist==2.5.0
pymilvus==2.2.0.dev45
pymilvus==2.2.0.dev49
pytest-rerunfailures==9.1.1
git+https://github.com/Projectplace/pytest-tags
ndg-httpsclient

File diff suppressed because it is too large Load Diff

View File

@ -308,6 +308,7 @@ class TestUtilityParams(TestcaseBase):
self.utility_wrap.drop_collection(c_name)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vector_invalid_type(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
@ -324,6 +325,7 @@ class TestUtilityParams(TestcaseBase):
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vector_invalid_value(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
@ -340,6 +342,7 @@ class TestUtilityParams(TestcaseBase):
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vector_invalid_type(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
@ -358,6 +361,7 @@ class TestUtilityParams(TestcaseBase):
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vector_invalid_value(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
@ -376,6 +380,7 @@ class TestUtilityParams(TestcaseBase):
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_invalid_metric_type(self, get_support_metric_field, get_invalid_metric_type):
"""
target: test calculated distance with invalid metric
@ -397,6 +402,7 @@ class TestUtilityParams(TestcaseBase):
"is illegal".format(metric)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_invalid_metric_value(self, get_support_metric_field, get_invalid_metric_value):
"""
target: test calculated distance with invalid metric
@ -418,6 +424,7 @@ class TestUtilityParams(TestcaseBase):
"float vector".format(metric)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_not_support_metric(self, get_support_metric_field, get_not_support_metric):
"""
target: test calculated distance with invalid metric
@ -439,6 +446,7 @@ class TestUtilityParams(TestcaseBase):
"float vector".format(metric)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_invalid_using(self, get_support_metric_field):
"""
target: test calculated distance with invalid using
@ -459,6 +467,7 @@ class TestUtilityParams(TestcaseBase):
"err_msg": "should create connect"})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_not_match_dim(self):
"""
target: test calculated distance with invalid vectors
@ -478,6 +487,7 @@ class TestUtilityParams(TestcaseBase):
"vectors with different dimension"})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_collection_before_load(self, get_support_metric_field):
"""
target: test calculated distance when entities is not ready
@ -1010,6 +1020,7 @@ class TestUtilityBase(TestcaseBase):
sleep(1)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_default(self):
"""
target: test calculated distance with default params
@ -1030,6 +1041,7 @@ class TestUtilityBase(TestcaseBase):
"vectors_r": vectors_r})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_default_sqrt(self, metric_field, metric):
"""
target: test calculated distance with default param
@ -1052,6 +1064,7 @@ class TestUtilityBase(TestcaseBase):
"metric": metric})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_default_metric(self, sqrt):
"""
target: test calculated distance with default param
@ -1074,6 +1087,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_binary_metric(self, metric_field, metric_binary):
"""
target: test calculate distance with binary vectors
@ -1099,6 +1113,7 @@ class TestUtilityBase(TestcaseBase):
"metric": metric_binary})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_from_collection_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance from collection entities
@ -1130,6 +1145,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_from_collections(self, metric_field, metric, sqrt):
"""
target: test calculated distance between entities from collections
@ -1160,6 +1176,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vector_and_collection_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance from collection entities
@ -1190,6 +1207,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vector_and_collection_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance from collection entities
@ -1218,6 +1236,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_from_partition_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance from one partition entities
@ -1252,6 +1271,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_from_partitions(self, metric_field, metric, sqrt):
"""
target: test calculated distance between entities from partitions
@ -1281,6 +1301,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vectors_and_partition_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance between vectors and partition entities
@ -1314,6 +1335,7 @@ class TestUtilityBase(TestcaseBase):
"sqrt": sqrt})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vectors_and_partition_ids(self, metric_field, metric, sqrt):
"""
target: test calculated distance between vectors and partition entities
@ -3340,7 +3362,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.flush(check_task=CheckTasks.check_permission_deny)
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
collection_w.query(default_term_expr, check_task=CheckTasks.check_permission_deny)
# self.utility_wrap.bulk_load(c_name, check_task=CheckTasks.check_permission_deny)
# self.utility_wrap.bulk_insert(c_name, check_task=CheckTasks.check_permission_deny)
# Global permission deny
self.init_collection_wrap(name=c_name_2, check_task=CheckTasks.check_permission_deny)