From 72c81c8ae4956f66f55d790bb6bfa871eca3f581 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Wed, 3 Jan 2024 15:20:49 +0800 Subject: [PATCH] test: add multi-tenancy checker (#29635) add multi-tenancy checker Signed-off-by: zhuwenxing --- tests/python_client/chaos/checker.py | 3 - tests/python_client/chaos/conftest.py | 6 + ..._concurrent_operation_for_multi_tenancy.py | 137 ++++++++++++++++++ tests/python_client/conftest.py | 6 + 4 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 tests/python_client/chaos/testcases/test_concurrent_operation_for_multi_tenancy.py diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 26c63d4f1a..3a0c7e5304 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -724,9 +724,6 @@ class InsertChecker(Checker): timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) - if result: - # TODO: persist data to file - self.inserted_data.extend(ts_data) return res, result @exception_handler() diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index 2f75d64e8d..6d8dd802f4 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -12,6 +12,7 @@ def pytest_addoption(parser): parser.addoption("--chaos_interval", action="store", default="2m", help="chaos_interval") parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check") parser.addoption("--wait_signal", action="store", type=bool, default=True, help="wait_signal") + parser.addoption("--collection_num", action="store", default="1", help="collection_num") @pytest.fixture @@ -44,6 +45,11 @@ def target_number(request): return request.config.getoption("--target_number") +@pytest.fixture +def collection_num(request): + return request.config.getoption("--collection_num") + + @pytest.fixture def chaos_duration(request): return request.config.getoption("--chaos_duration") diff --git a/tests/python_client/chaos/testcases/test_concurrent_operation_for_multi_tenancy.py b/tests/python_client/chaos/testcases/test_concurrent_operation_for_multi_tenancy.py new file mode 100644 index 0000000000..075930d904 --- /dev/null +++ b/tests/python_client/chaos/testcases/test_concurrent_operation_for_multi_tenancy.py @@ -0,0 +1,137 @@ +import time +import pytest +import threading +import json +from time import sleep +from pymilvus import connections, db +from chaos.checker import (InsertChecker, + UpsertChecker, + SearchChecker, + QueryChecker, + DeleteChecker, + Op, + ResultAnalyzer + ) +from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from chaos import constants + + +def get_all_collections(): + try: + with open("/tmp/ci_logs/all_collections.json", "r") as f: + data = json.load(f) + all_collections = data["all"] + except Exception as e: + log.warn(f"get_all_collections error: {e}") + return [None] + return all_collections + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_compact = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperations(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port, user, password, db_name, milvus_ns): + if user and password: + log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', uri=f"{host}:{port}", token=f"{user}:{password}") + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + all_dbs = db.list_database() + log.info(f"all dbs: {all_dbs}") + if db_name not in all_dbs: + db.create_database(db_name) + db.using_database(db_name) + log.info(f"connect to milvus {host}:{port}, db {db_name} successfully") + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_ns = milvus_ns + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + } + self.health_checkers = checkers + return checkers + + @pytest.fixture(scope="function", params=get_all_collections()) + def collection_name(self, request): + if request.param == [] or request.param == "": + pytest.skip("The collection name is invalid") + yield request.param + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, request_duration, is_check, collection_name, collection_num, db_name): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + all_checkers = [] + + def worker(c_name): + log.info(f"start checker for collection name: {c_name}") + op_checker = self.init_health_checkers(collection_name=c_name) + all_checkers.append(op_checker) + # insert data in init stage + try: + num_entities = op_checker[Op.insert].c_wrap.num_entities + if num_entities < 200000: + nb = 5000 + num_to_insert = 200000 - num_entities + for i in range(num_to_insert//nb): + op_checker[Op.insert].insert_data(nb=nb) + else: + log.info(f"collection {c_name} has enough data {num_entities}, skip insert data") + except Exception as e: + log.error(f"insert data error: {e}") + threads = [] + for i in range(collection_num): + c_name = collection_name if collection_name else f"DB_{db_name}_Collection_{i}_Checker" + thread = threading.Thread(target=worker, args=(c_name,)) + threads.append(thread) + thread.start() + for thread in threads: + thread.join() + + for checker in all_checkers: + cc.start_monitor_threads(checker) + + log.info("*********************Load Start**********************") + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") + if request_duration[-1] == "+": + request_duration = request_duration[:-1] + request_duration = eval(request_duration) + for i in range(10): + sleep(request_duration//10) + for checker in all_checkers: + for k, v in checker.items(): + v.check_result() + try: + ra = ResultAnalyzer() + ra.get_stage_success_rate() + ra.show_result_table() + except Exception as e: + log.error(f"get stage success rate error: {e}") + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/conftest.py b/tests/python_client/conftest.py index 1119676fab..fecd1fe914 100644 --- a/tests/python_client/conftest.py +++ b/tests/python_client/conftest.py @@ -24,6 +24,7 @@ def pytest_addoption(parser): parser.addoption("--port", action="store", default=19530, help="service's port") parser.addoption("--user", action="store", default="", help="user name for connection") parser.addoption("--password", action="store", default="", help="password for connection") + parser.addoption("--db_name", action="store", default="default", help="database name for connection") parser.addoption("--secure", type=bool, action="store", default=False, help="secure for connection") parser.addoption("--milvus_ns", action="store", default="chaos-testing", help="milvus_ns") parser.addoption("--http_port", action="store", default=19121, help="http's port") @@ -75,6 +76,11 @@ def password(request): return request.config.getoption("--password") +@pytest.fixture +def db_name(request): + return request.config.getoption("--db_name") + + @pytest.fixture def secure(request): return request.config.getoption("--secure")