diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 26c63d4f1a..64be4b9d4c 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -1,7 +1,7 @@ import pytest import unittest from enum import Enum -from random import randint +import random import time import threading import os @@ -282,14 +282,22 @@ def exception_handler(): def wrapper(func): @functools.wraps(func) def inner_wrapper(self, *args, **kwargs): + class_name = None + function_name = None try: + function_name = func.__name__ + class_name = getattr(self, '__class__', None).__name__ if self else None res, result = func(self, *args, **kwargs) return res, result except Exception as e: log_row_length = 300 e_str = str(e) - log_e = e_str[0:log_row_length] + \ - '......' if len(e_str) > log_row_length else e_str + log_e = e_str[0:log_row_length] + '......' if len(e_str) > log_row_length else e_str + if class_name: + log_message = f"Error in {class_name}.{function_name}: {log_e}" + else: + log_message = f"Error in {function_name}: {log_e}" + log.error(log_message) log.error(log_e) return Error(e), False @@ -306,7 +314,7 @@ class Checker: """ def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=ct.default_dim, insert_data=True, - schema=None): + schema=None, replica_number=1, **kwargs): self.recovery_time = 0 self._succ = 0 self._fail = 0 @@ -337,26 +345,45 @@ class Checker: shards_num=shards_num, timeout=timeout, enable_traceback=enable_traceback) + self.index_name = "vec_index" + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=self.index_name, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + self.replica_number=replica_number + self.c_wrap.load(replica_number=self.replica_number) + self.p_wrap.init_partition(self.c_name, self.p_name) if insert_data: log.info(f"collection {c_name} created, start to insert data") t0 = time.perf_counter() self.c_wrap.insert( - data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0), + data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema), partition_name=self.p_name, timeout=timeout, enable_traceback=enable_traceback) log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s") self.initial_entities = self.c_wrap.num_entities # do as a flush + self.scale = 100000 # timestamp scale to make time.time() as int64 - def insert_data(self, nb=constants.ENTITIES_FOR_SEARCH, partition_name=None): + def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None): partition_name = self.p_name if partition_name is None else partition_name - self.c_wrap.insert( - data=cf.get_column_data_by_schema(nb=nb, schema=self.schema, start=0), - partition_name=partition_name, - timeout=timeout, - enable_traceback=enable_traceback) + data = cf.get_column_data_by_schema(nb=nb, schema=self.schema) + ts_data = [] + for i in range(nb): + time.sleep(0.001) + offset_ts = int(time.time() * self.scale) + ts_data.append(offset_ts) + data[0] = ts_data # set timestamp (ms) as int64 + log.debug(f"insert data: {ts_data}") + res, result = self.c_wrap.insert(data=data, + partition_name=partition_name, + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result def total(self): return self._succ + self._fail @@ -450,7 +477,7 @@ class CollectionLoadChecker(Checker): super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -483,7 +510,7 @@ class CollectionReleaseChecker(Checker): super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -513,14 +540,16 @@ class PartitionLoadChecker(Checker): def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): self.replica_number = replica_number if collection_name is None: - collection_name = cf.gen_unique_str("LoadChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + collection_name = cf.gen_unique_str("PartitionLoadChecker_") + p_name = cf.gen_unique_str("PartitionLoadChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) + self.c_wrap.release() @trace() def load_partition(self): @@ -546,14 +575,16 @@ class PartitionReleaseChecker(Checker): def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): self.replica_number = replica_number if collection_name is None: - collection_name = cf.gen_unique_str("LoadChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + collection_name = cf.gen_unique_str("PartitionReleaseChecker_") + p_name = cf.gen_unique_str("PartitionReleaseChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) + self.c_wrap.release() self.p_wrap.load(replica_number=self.replica_number) @trace() @@ -583,7 +614,7 @@ class SearchChecker(Checker): super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -743,7 +774,7 @@ class InsertChecker(Checker): try: self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str('index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -774,13 +805,14 @@ class UpsertChecker(Checker): def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): if collection_name is None: - collection_name = cf.gen_unique_str("InsertChecker_") + collection_name = cf.gen_unique_str("UpsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) @trace() def upsert_entities(self): - data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) - res, result = self.c_wrap.upsert(data=data, + + res, result = self.c_wrap.upsert(data=self.data, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -788,13 +820,20 @@ class UpsertChecker(Checker): @exception_handler() def run_task(self): + # half of the data is upsert, the other half is insert + rows = len(self.data[0]) + pk_old = self.data[0][:rows // 2] + self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) + pk_new = self.data[0][rows // 2:] + pk_update = pk_old + pk_new + self.data[0] = pk_update res, result = self.upsert_entities() return res, result def keep_running(self): while self._keep_running: self.run_task() - sleep(constants.WAIT_PER_OP / 10) + sleep(constants.WAIT_PER_OP) class CollectionCreateChecker(Checker): @@ -818,8 +857,10 @@ class CollectionCreateChecker(Checker): @exception_handler() def run_task(self): res, result = self.init_collection() - if result: - self.c_wrap.drop(timeout=timeout) + # if result: + # # 50% chance to drop collection + # if random.randint(0, 1) == 0: + # self.c_wrap.drop(timeout=timeout) return res, result def keep_running(self): @@ -881,6 +922,17 @@ class PartitionCreateChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("PartitionCreateChecker_") super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name) + c_name = cf.gen_unique_str("PartitionDropChecker_") + self.c_wrap.init_collection(name=c_name, schema=self.schema) + self.c_name = c_name + log.info(f"collection {c_name} created") + self.p_wrap.init_partition(collection=self.c_name, + name=cf.gen_unique_str("PartitionDropChecker_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing + ) + log.info(f"partition: {self.p_wrap}") @trace() def create_partition(self): @@ -895,8 +947,6 @@ class PartitionCreateChecker(Checker): @exception_handler() def run_task(self): res, result = self.create_partition() - if result: - self.p_wrap.drop(timeout=timeout) return res, result def keep_running(self): @@ -912,12 +962,17 @@ class PartitionDropChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("PartitionDropChecker_") super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name) + c_name = cf.gen_unique_str("PartitionDropChecker_") + self.c_wrap.init_collection(name=c_name, schema=self.schema) + self.c_name = c_name + log.info(f"collection {c_name} created") self.p_wrap.init_partition(collection=self.c_name, name=cf.gen_unique_str("PartitionDropChecker_"), timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing ) + log.info(f"partition: {self.p_wrap}") @trace() def drop_partition(self): @@ -928,12 +983,14 @@ class PartitionDropChecker(Checker): def run_task(self): res, result = self.drop_partition() if result: - self.p_wrap.init_partition(collection=self.c_name, - name=cf.gen_unique_str("PartitionDropChecker_"), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing - ) + # create two partition then drop one + for i in range(2): + self.p_wrap.init_partition(collection=self.c_name, + name=cf.gen_unique_str("PartitionDropChecker_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing + ) return res, result def keep_running(self): @@ -1007,7 +1064,6 @@ class IndexCreateChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("IndexChecker_") super().__init__(collection_name=collection_name, schema=schema) - self.index_name = cf.gen_unique_str('index_') for i in range(5): self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), timeout=timeout, enable_traceback=enable_traceback) @@ -1025,9 +1081,11 @@ class IndexCreateChecker(Checker): @exception_handler() def run_task(self): + c_name = cf.gen_unique_str("IndexCreateChecker_") + self.c_wrap.init_collection(name=c_name, schema=self.schema) res, result = self.create_index() if result: - self.c_wrap.drop_index(timeout=timeout) + self.c_wrap.drop_index(timeout=timeout, index_name=self.index_name) return res, result def keep_running(self): @@ -1043,17 +1101,11 @@ class IndexDropChecker(Checker): if collection_name is None: collection_name = cf.gen_unique_str("IndexChecker_") super().__init__(collection_name=collection_name, schema=schema) - self.index_name = cf.gen_unique_str('index_') for i in range(5): self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), timeout=timeout, enable_traceback=enable_traceback) # do as a flush before indexing log.debug(f"Index ready entities: {self.c_wrap.num_entities}") - self.c_wrap.create_index(self.float_vector_field_name, - constants.DEFAULT_INDEX_PARAM, - index_name=self.index_name, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) @trace() def drop_index(self): @@ -1064,6 +1116,7 @@ class IndexDropChecker(Checker): def run_task(self): res, result = self.drop_index() if result: + self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema) self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=self.index_name, @@ -1073,6 +1126,12 @@ class IndexDropChecker(Checker): def keep_running(self): while self._keep_running: + self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema) + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=self.index_name, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) self.run_task() sleep(constants.WAIT_PER_OP * 6) @@ -1086,8 +1145,7 @@ class QueryChecker(Checker): super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -1105,7 +1163,7 @@ class QueryChecker(Checker): def run_task(self): int_values = [] for _ in range(5): - int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) + int_values.append(random.randint(0, constants.ENTITIES_FOR_SEARCH)) self.term_expr = f'{self.int64_field_name} in {int_values}' res, result = self.query() return res, result @@ -1125,28 +1183,44 @@ class DeleteChecker(Checker): super().__init__(collection_name=collection_name, schema=schema) res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) self.c_wrap.load() # load before query self.insert_data() - term_expr = f'{self.int64_field_name} > 0' - res, _ = self.c_wrap.query(term_expr, output_fields=[ - self.int64_field_name]) + query_expr = f'{self.int64_field_name} > 0' + res, _ = self.c_wrap.query(query_expr, + output_fields=[self.int64_field_name], + partition_name=self.p_name) self.ids = [r[self.int64_field_name] for r in res] - self.expr = None + self.query_expr = query_expr + delete_ids = self.ids[:len(self.ids) // 2] # delete half of ids + self.delete_expr = f'{self.int64_field_name} in {delete_ids}' + + def update_delete_expr(self): + res, _ = self.c_wrap.query(self.query_expr, + output_fields=[self.int64_field_name], + partition_name=self.p_name) + all_ids = [r[self.int64_field_name] for r in res] + if len(all_ids) < 100: + # insert data to make sure there are enough ids to delete + self.insert_data(nb=10000) + res, _ = self.c_wrap.query(self.query_expr, + output_fields=[self.int64_field_name], + partition_name=self.p_name) + all_ids = [r[self.int64_field_name] for r in res] + delete_ids = all_ids[:len(all_ids) // 2] # delete half of ids + self.delete_expr = f'{self.int64_field_name} in {delete_ids}' @trace() def delete_entities(self): - res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout) + res, result = self.c_wrap.delete(expr=self.delete_expr, timeout=timeout, partition_name=self.p_name) return res, result @exception_handler() def run_task(self): - delete_ids = self.ids.pop() - self.expr = f'{self.int64_field_name} in {[delete_ids]}' + self.update_delete_expr() res, result = self.delete_entities() return res, result @@ -1166,8 +1240,7 @@ class CompactChecker(Checker): self.ut = ApiUtilityWrapper() res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -1201,8 +1274,7 @@ class LoadBalanceChecker(Checker): self.utility_wrap = ApiUtilityWrapper() res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), + index_name=self.index_name, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) diff --git a/tests/python_client/chaos/constants.py b/tests/python_client/chaos/constants.py index 46509fde4e..5c69a8b71b 100644 --- a/tests/python_client/chaos/constants.py +++ b/tests/python_client/chaos/constants.py @@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chaos mesh group CHAOS_VERSION = 'v1alpha1' # chaos mesh version SUCC = 'succ' FAIL = 'fail' -DELTA_PER_INS = 10 # entities per insert +DELTA_PER_INS = 3000 # entities per insert ENTITIES_FOR_SEARCH = 3000 # entities for search_collection ENTITIES_FOR_BULKINSERT = 1000000 # entities for bulk insert CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chaos path diff --git a/tests/python_client/chaos/testcases/test_all_checker_operation.py b/tests/python_client/chaos/testcases/test_all_checker_operation.py index 1c00febbe5..a9087fe361 100644 --- a/tests/python_client/chaos/testcases/test_all_checker_operation.py +++ b/tests/python_client/chaos/testcases/test_all_checker_operation.py @@ -2,7 +2,7 @@ import time import pytest from time import sleep -from pymilvus import connections +from pymilvus import connections, db from chaos.checker import ( DatabaseCreateChecker, DatabaseDropChecker, @@ -52,7 +52,7 @@ class TestBase: class TestOperations(TestBase): @pytest.fixture(scope="function", autouse=True) - def connection(self, host, port, user, password, milvus_ns): + def connection(self, host, port, user, password, milvus_ns, database_name): if user and password: # log.info(f"connect to {host}:{port} with user {user} and password {password}") connections.connect('default', host=host, port=port, user=user, password=password, secure=True) @@ -60,7 +60,11 @@ class TestOperations(TestBase): connections.connect('default', host=host, port=port) if connections.has_connection("default") is False: raise Exception("no connections") - log.info("connect to milvus successfully") + all_dbs = db.list_database() + if database_name not in all_dbs: + db.create_database(database_name) + db.using_database(database_name) + log.info(f"connect to milvus {host}:{port}, db {database_name} successfully") self.host = host self.port = port self.user = user diff --git a/tests/python_client/chaos/testcases/test_concurrent_operation.py b/tests/python_client/chaos/testcases/test_concurrent_operation.py index c582c5a803..8e87b20911 100644 --- a/tests/python_client/chaos/testcases/test_concurrent_operation.py +++ b/tests/python_client/chaos/testcases/test_concurrent_operation.py @@ -77,6 +77,7 @@ class TestOperations(TestBase): Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), } + log.info(f"init_health_checkers: {checkers}") self.health_checkers = checkers @pytest.fixture(scope="function", params=get_all_collections()) diff --git a/tests/python_client/conftest.py b/tests/python_client/conftest.py index 1119676fab..6f27d64285 100644 --- a/tests/python_client/conftest.py +++ b/tests/python_client/conftest.py @@ -30,6 +30,7 @@ def pytest_addoption(parser): parser.addoption("--handler", action="store", default="GRPC", help="handler of request") parser.addoption("--tag", action="store", default="all", help="only run tests matching the tag.") parser.addoption('--dry_run', action='store_true', default=False, help="") + parser.addoption('--database_name', action='store', default="default", help="name of database") parser.addoption('--partition_name', action='store', default="partition_name", help="name of partition") parser.addoption('--connect_name', action='store', default="connect_name", help="name of connect") parser.addoption('--descriptions', action='store', default="partition_des", help="descriptions of partition") @@ -110,6 +111,11 @@ def connect_name(request): return request.config.getoption("--connect_name") +@pytest.fixture +def database_name(request): + return request.config.getoption("--database_name") + + @pytest.fixture def partition_name(request): return request.config.getoption("--partition_name") diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 4dd2410388..8593cd7521 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -47,6 +47,7 @@ pandas==1.5.3 tenacity==8.1.0 # for standby test etcd-sdk-python==0.0.4 +deepdiff==6.7.1 # for test result anaylszer prettytable==3.8.0 diff --git a/tests/python_client/testcases/test_concurrent.py b/tests/python_client/testcases/test_concurrent.py index 50da0f5b06..fdd9d98b5e 100644 --- a/tests/python_client/testcases/test_concurrent.py +++ b/tests/python_client/testcases/test_concurrent.py @@ -4,6 +4,7 @@ import json from time import sleep from pymilvus import connections from chaos.checker import (InsertChecker, + UpsertChecker, SearchChecker, QueryChecker, DeleteChecker, @@ -63,6 +64,7 @@ class TestOperations(TestBase): c_name = collection_name checkers = { Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name),