From dbbe6557edb390ead548986d537e11a25eb80328 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Mon, 11 Jul 2022 17:08:25 +0800 Subject: [PATCH] [test]Refine deploy test (#18212) Signed-off-by: zhuwenxing --- tests/python_client/base/client_base.py | 11 +- .../python_client/base/collection_wrapper.py | 3 - tests/python_client/check/func_check.py | 20 ++ tests/python_client/common/common_func.py | 13 +- tests/python_client/common/common_type.py | 1 + tests/python_client/deploy/__init__.py | 0 tests/python_client/deploy/base.py | 10 + tests/python_client/deploy/common.py | 63 ++++++ tests/python_client/deploy/conftest.py | 31 +++ .../testcases/test_action_after_reinstall.py | 202 ++++++++++++++++++ .../testcases/test_action_before_reinstall.py | 125 +++++++++++ .../deploy/testcases/test_action_reinstall.py | 176 +++++++++++++++ .../test_all_collections_verification.py | 198 +++++++++++++++++ .../testcases/test_get_all_collections.py | 37 ++++ tests/python_client/testcases/test_search.py | 4 +- tests/python_client/utils/api_request.py | 6 +- tests/python_client/utils/wrapper.py | 16 +- 17 files changed, 893 insertions(+), 23 deletions(-) create mode 100644 tests/python_client/deploy/__init__.py create mode 100644 tests/python_client/deploy/base.py create mode 100644 tests/python_client/deploy/common.py create mode 100644 tests/python_client/deploy/conftest.py create mode 100644 tests/python_client/deploy/testcases/test_action_after_reinstall.py create mode 100644 tests/python_client/deploy/testcases/test_action_before_reinstall.py create mode 100644 tests/python_client/deploy/testcases/test_action_reinstall.py create mode 100644 tests/python_client/deploy/testcases/test_all_collections_verification.py create mode 100644 tests/python_client/deploy/testcases/test_get_all_collections.py diff --git a/tests/python_client/base/client_base.py b/tests/python_client/base/client_base.py index 39fcd38aae..4913ccb466 100644 --- a/tests/python_client/base/client_base.py +++ b/tests/python_client/base/client_base.py @@ -122,10 +122,10 @@ class TestcaseBase(Base): **kwargs) return partition_wrap - def init_collection_general(self, prefix, insert_data=False, nb=ct.default_nb, + def init_collection_general(self, prefix="test", insert_data=False, nb=ct.default_nb, partition_num=0, is_binary=False, is_all_data_type=False, auto_id=False, dim=ct.default_dim, is_index=False, - primary_field=ct.default_int64_field_name, is_flush=True): + primary_field=ct.default_int64_field_name, is_flush=True, name=None, **kwargs): """ target: create specified collections method: 1. create collections (binary/non-binary, default/all data type, auto_id or not) @@ -138,6 +138,8 @@ class TestcaseBase(Base): log.info("Test case of search interface: initialize before test case") self._connect() collection_name = cf.gen_unique_str(prefix) + if name is not None: + collection_name = name vectors = [] binary_raw_vectors = [] insert_ids = [] @@ -149,8 +151,7 @@ class TestcaseBase(Base): if is_all_data_type: default_schema = cf.gen_collection_schema_all_datatype(auto_id=auto_id, dim=dim, primary_field=primary_field) log.info("init_collection_general: collection creation") - collection_w = self.init_collection_wrap(name=collection_name, - schema=default_schema) + collection_w = self.init_collection_wrap(name=collection_name, schema=default_schema, **kwargs) # 2 add extra partitions if specified (default is 1 partition named "_default") if partition_num > 0: cf.gen_partitions(collection_w, partition_num) @@ -161,8 +162,6 @@ class TestcaseBase(Base): if is_flush: assert collection_w.is_empty is False assert collection_w.num_entities == nb - log.info("insert_data: inserted data into collection %s (num_entities: %s)" - % (collection_w.name, nb)) # This condition will be removed after auto index feature if not is_index: collection_w.load() diff --git a/tests/python_client/base/collection_wrapper.py b/tests/python_client/base/collection_wrapper.py index 76a57021a2..6b64dbac36 100644 --- a/tests/python_client/base/collection_wrapper.py +++ b/tests/python_client/base/collection_wrapper.py @@ -299,7 +299,6 @@ class ApiCollectionWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - @trace() def get_compaction_state(self, timeout=None, check_task=None, check_items=None, **kwargs): timeout = TIMEOUT if timeout is None else timeout func_name = sys._getframe().f_code.co_name @@ -307,7 +306,6 @@ class ApiCollectionWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - @trace() def get_compaction_plans(self, timeout=None, check_task=None, check_items={}, **kwargs): timeout = TIMEOUT if timeout is None else timeout func_name = sys._getframe().f_code.co_name @@ -315,7 +313,6 @@ class ApiCollectionWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - @trace() def wait_for_compaction_completed(self, timeout=None, **kwargs): timeout = TIMEOUT * 3 if timeout is None else timeout res = self.collection.wait_for_compaction_completed(timeout, **kwargs) diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 90caab5f3d..46c7a763e7 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -62,6 +62,9 @@ class ResponseChecker: elif self.check_task == CheckTasks.check_query_empty: result = self.check_query_empty(self.response, self.func_name) + elif self.check_task == CheckTasks.check_query_empty: + result = self.check_query_not_empty(self.response, self.func_name) + elif self.check_task == CheckTasks.check_distance: # Calculate distance interface that response check result = self.check_distance(self.response, self.func_name, self.check_items) @@ -282,6 +285,23 @@ class ResponseChecker: raise Exception("The query result to check isn't list type object") assert len(query_res) == 0, "Query result is not empty" + @staticmethod + def check_query_not_empty(query_res, func_name): + """ + Verify that the query result is not empty + + :param: query_res: A list that contains all results + :type: list + + :param func_name: Query API name + :type func_name: str + """ + if func_name != 'query': + log.warning("The function name is {} rather than {}".format(func_name, "query")) + if not isinstance(query_res, list): + raise Exception("The query result to check isn't list type object") + assert len(query_res) > 0 + @staticmethod def check_distance(distance_res, func_name, check_items): if func_name != 'calc_distance': diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 12cc832f7e..907595b640 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -112,7 +112,15 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct. primary_field=primary_field, auto_id=auto_id) return schema - +def gen_general_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name, + auto_id=False, is_binary=False, dim=ct.default_dim): + if is_binary: + fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_binary_vec_field(dim=dim)] + else: + fields = [gen_int64_field(), gen_float_field(), gen_string_field(), gen_float_vec_field(dim=dim)] + schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description, + primary_field=primary_field, auto_id=auto_id) + return schema def gen_string_pk_default_collection_schema(description=ct.default_desc, primary_field=ct.default_string_field_name, auto_id=False, dim=ct.default_dim): @@ -637,8 +645,7 @@ def insert_data(collection_w, nb=3000, is_binary=False, is_all_data_type=False, binary_raw_vectors = [] insert_ids = [] start = insert_offset - log.info("insert_data: inserting data into collection %s (num_entities: %s)" - % (collection_w.name, nb)) + log.info(f"inserted {nb} data into collection {collection_w.name}") for i in range(num): default_data = gen_default_dataframe_data(nb // num, dim=dim, start=start) if is_binary: diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index de74c22651..6b18c59a6f 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -186,6 +186,7 @@ class CheckTasks: check_search_results = "check_search_results" check_query_results = "check_query_results" check_query_empty = "check_query_empty" # verify that query result is empty + check_query_not_empty = "check_query_not_empty" check_distance = "check_distance" check_delete_compact = "check_delete_compact" check_merge_compact = "check_merge_compact" diff --git a/tests/python_client/deploy/__init__.py b/tests/python_client/deploy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/python_client/deploy/base.py b/tests/python_client/deploy/base.py new file mode 100644 index 0000000000..f2de4c4948 --- /dev/null +++ b/tests/python_client/deploy/base.py @@ -0,0 +1,10 @@ +from base.client_base import TestcaseBase +from utils.util_log import test_log as log + + +class TestDeployBase(TestcaseBase): + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + log.info("skip drop collection") diff --git a/tests/python_client/deploy/common.py b/tests/python_client/deploy/common.py new file mode 100644 index 0000000000..a2d7eb7683 --- /dev/null +++ b/tests/python_client/deploy/common.py @@ -0,0 +1,63 @@ +import json +from utils.util_log import test_log as log + +all_index_types = ["FLAT", "IVF_FLAT", "IVF_SQ8", "IVF_PQ", "HNSW", "ANNOY", "RHNSW_FLAT", "RHNSW_PQ", "RHNSW_SQ", + "BIN_FLAT", "BIN_IVF_FLAT"] + +default_index_params = [{"nlist": 128}, {"nlist": 128}, {"nlist": 128}, {"nlist": 128, "m": 16, "nbits": 8}, + {"M": 48, "efConstruction": 500}, {"n_trees": 50}, {"M": 48, "efConstruction": 500}, + {"M": 48, "efConstruction": 500, "PQM": 8}, {"M": 48, "efConstruction": 500}, {"nlist": 128}, + {"nlist": 128}] + +index_params_map = dict(zip(all_index_types, default_index_params)) + +def gen_index_param(index_type): + metric_type = "L2" + if "BIN" in index_type: + metric_type = "HAMMING" + index_param = { + "index_type": index_type, + "params": index_params_map[index_type], + "metric_type": metric_type + } + return index_param + + +def gen_search_param(index_type, metric_type="L2"): + search_params = [] + if index_type in ["FLAT", "IVF_FLAT", "IVF_SQ8", "IVF_SQ8H", "IVF_PQ"]: + for nprobe in [10]: + ivf_search_params = {"metric_type": metric_type, "params": {"nprobe": nprobe}} + search_params.append(ivf_search_params) + elif index_type in ["BIN_FLAT", "BIN_IVF_FLAT"]: + for nprobe in [10]: + bin_search_params = {"metric_type": "HAMMING", "params": {"nprobe": nprobe}} + search_params.append(bin_search_params) + elif index_type in ["HNSW", "RHNSW_FLAT", "RHNSW_PQ", "RHNSW_SQ"]: + for ef in [64]: + hnsw_search_param = {"metric_type": metric_type, "params": {"ef": ef}} + search_params.append(hnsw_search_param) + elif index_type in ["NSG", "RNSG"]: + for search_length in [100]: + nsg_search_param = {"metric_type": metric_type, "params": {"search_length": search_length}} + search_params.append(nsg_search_param) + elif index_type == "ANNOY": + for search_k in [1000]: + annoy_search_param = {"metric_type": metric_type, "params": {"search_k": search_k}} + search_params.append(annoy_search_param) + else: + print("Invalid index_type.") + raise Exception("Invalid index_type.") + return search_params + + + +def get_all_collections(): + try: + with open("/tmp/ci_logs/all_collections.json", "r") as f: + data = json.load(f) + all_collections = data["all"] + except Exception as e: + log.error(f"get_all_collections error: {e}") + return [] + return all_collections \ No newline at end of file diff --git a/tests/python_client/deploy/conftest.py b/tests/python_client/deploy/conftest.py new file mode 100644 index 0000000000..f21dfc8504 --- /dev/null +++ b/tests/python_client/deploy/conftest.py @@ -0,0 +1,31 @@ +import logging + +import pytest +import functools +import socket + +import common.common_type as ct +import common.common_func as cf +from utils.util_log import test_log as log +from common.common_func import param_info +from check.param_check import ip_check, number_check +from config.log_config import log_config +from utils.util_pymilvus import get_milvus, gen_unique_str, gen_default_fields, gen_binary_default_fields +from pymilvus.orm.types import CONSISTENCY_STRONG + +timeout = 60 +dimension = 128 +delete_timeout = 60 + + +def pytest_addoption(parser): + + parser.addoption('--data_size', type='int', action='store', default=3000, help="data size for deploy test") + + +@pytest.fixture +def data_size(request): + return request.config.getoption("--data_size") + +# add a fixture for all index? + diff --git a/tests/python_client/deploy/testcases/test_action_after_reinstall.py b/tests/python_client/deploy/testcases/test_action_after_reinstall.py new file mode 100644 index 0000000000..747a5fb492 --- /dev/null +++ b/tests/python_client/deploy/testcases/test_action_after_reinstall.py @@ -0,0 +1,202 @@ +import pytest +import random +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from common.milvus_sys import MilvusSys +from utils.util_pymilvus import * +from deploy.base import TestDeployBase +from deploy import common as dc +from deploy.common import gen_index_param, gen_search_param + +default_nb = ct.default_nb +default_nq = ct.default_nq +default_dim = ct.default_dim +default_limit = ct.default_limit +default_search_field = ct.default_float_vec_field_name +default_search_params = ct.default_search_params +default_int64_field_name = ct.default_int64_field_name +default_float_field_name = ct.default_float_field_name +default_bool_field_name = ct.default_bool_field_name +default_string_field_name = ct.default_string_field_name +binary_field_name = default_binary_vec_field_name +default_search_exp = "int64 >= 0" +default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + + +class TestActionBeforeReinstall(TestDeployBase): + """ Test case of action before reinstall """ + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % + method.__name__) + log.info("skip drop collection") + + @pytest.mark.skip() + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT" + def test_task_1(self, index_type, data_size): + """ + before reinstall: create collection and insert data, load and search + after reinstall: get collection, search, create index, load, and search + """ + name = "task_1_" + index_type + insert_data = False + is_binary = True if "BIN" in index_type else False + is_flush = False + # init collection + collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size, + is_flush=is_flush, name=name)[0] + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors( + default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + else: + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + search_params = gen_search_param(index_type)[0] + + # search + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + # query + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + # create index + default_index = gen_index_param(index_type) + collection_w.create_index(default_search_field, default_index) + # release and load after creating index + collection_w.release() + collection_w.load() + # search + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + # query + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT" + def test_task_2(self, index_type, data_size): + """ + before reinstall: create collection, insert data and create index,load and search + after reinstall: get collection, search, insert data, create index, load, and search + """ + name = "task_2_" + index_type + is_binary = True if "BIN" in index_type else False + # init collection + collection_w = self.init_collection_general(insert_data=False, is_binary=is_binary, nb=data_size, + is_flush=False, name=name, active_trace=True)[0] + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors( + default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + + search_params = gen_search_param(index_type)[0] + output_fields = [ct.default_int64_field_name] + # search + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=output_fields, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + # query + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + # insert data + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size, + is_flush=False, name=name, active_trace=True) + # create index + default_index = gen_index_param(index_type) + collection_w.create_index(default_search_field, default_index) + # release and load after + collection_w.release() + collection_w.load() + # search + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=output_fields, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + # query + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("replica_number", [0,1,2]) + @pytest.mark.parametrize("is_compacted", [True, False]) + @pytest.mark.parametrize("is_deleted", [True, False]) + @pytest.mark.parametrize("is_string_indexed", [True, False]) + @pytest.mark.parametrize("is_vector_indexed", [True, False]) # , "BIN_FLAT" + @pytest.mark.parametrize("segment_status", ["only_growing", "sealed", "all"]) # , "BIN_FLAT" + # @pytest.mark.parametrize("is_empty", [True, False]) # , "BIN_FLAT" (keep one is enough) + @pytest.mark.parametrize("index_type", random.sample(dc.all_index_types, 3)) # , "BIN_FLAT" + def test_task_all(self, index_type, is_compacted, + segment_status, is_vector_indexed, is_string_indexed, replica_number, is_deleted, data_size): + """ + before reinstall: create collection and insert data, load and search + after reinstall: get collection, search, create index, load, and search + """ + name = f"index_type_{index_type}_segment_status_{segment_status}_is_vector_indexed_{is_vector_indexed}_is_string_indexed_{is_string_indexed}_is_compacted_{is_compacted}_is_deleted_{is_deleted}_replica_number_{replica_number}_data_size_{data_size}" + ms = MilvusSys() + is_binary = True if "BIN" in index_type else False + # insert with small size data without flush to get growing segment + collection_w = self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000, + is_flush=False, name=name)[0] + + # load for growing segment + if replica_number > 0: + collection_w.load(replica_number=replica_number) + + delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]" + # delete data for growing segment + if is_deleted: + collection_w.delete(expr=delete_expr) + if segment_status == "only_growing": + pytest.skip("already get growing segment, skip testcase") + # insert with flush multiple times to generate multiple sealed segment + for i in range(5): + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size, + is_flush=False, name=name)[0] + if is_binary: + default_index_field = ct.default_binary_vec_field_name + else: + default_index_field = ct.default_float_vec_field_name + if is_vector_indexed: + # create index + default_index_param = gen_index_param(index_type) + collection_w.create_index(default_index_field, default_index_param) + if is_string_indexed: + # create index + default_string_index_params = {} + collection_w.create_index(default_string_field_name, default_string_index_params) + # delete data for sealed segment + delete_expr = f"{ct.default_int64_field_name} in [10,11,12,13,14,15,16,17,18,19]" + if is_deleted: + collection_w.delete(expr=delete_expr) + if is_compacted: + collection_w.compact() + # reload after flush and create index + if replica_number > 0: + collection_w.release() + collection_w.load(replica_number=replica_number) + + + diff --git a/tests/python_client/deploy/testcases/test_action_before_reinstall.py b/tests/python_client/deploy/testcases/test_action_before_reinstall.py new file mode 100644 index 0000000000..4472abc109 --- /dev/null +++ b/tests/python_client/deploy/testcases/test_action_before_reinstall.py @@ -0,0 +1,125 @@ +import pytest +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_pymilvus import * +from deploy.base import TestDeployBase +from deploy import common as dc +from deploy.common import gen_index_param, gen_search_param + + +default_nb = ct.default_nb +default_nq = ct.default_nq +default_dim = ct.default_dim +default_limit = ct.default_limit +default_search_field = ct.default_float_vec_field_name +default_search_params = ct.default_search_params +default_int64_field_name = ct.default_int64_field_name +default_float_field_name = ct.default_float_field_name +default_bool_field_name = ct.default_bool_field_name +default_string_field_name = ct.default_string_field_name +binary_field_name = default_binary_vec_field_name +default_search_exp = "int64 >= 0" +default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + +class TestActionBeforeReinstall(TestDeployBase): + """ Test case of action before reinstall """ + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % method.__name__) + log.info("skip drop collection") + + @pytest.mark.skip() + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("index_type", dc.all_index_types) #, "BIN_FLAT" + def test_task_1(self, index_type, data_size): + """ + before reinstall: create collection and insert data, load and search + after reinstall: get collection, load, search, create index, load, and search + """ + name = "task_1_" + index_type + insert_data = True + is_binary = True if "BIN" in index_type else False + is_flush = False + collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size, + is_flush=is_flush, name=name)[0] + collection_w.load() + + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors(default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + else: + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + search_params = gen_search_param(index_type)[0] + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("index_type", dc.all_index_types) # , "BIN_FLAT" + def test_task_2(self, index_type, data_size): + """ + before reinstall: create collection, insert data and create index,load and search + after reinstall: get collection, load, search, insert data, create index, load, and search + """ + name = "task_2_" + index_type + insert_data = True + is_binary = True if "BIN" in index_type else False + is_flush = False + # create collection and insert data + collection_w = self.init_collection_general(insert_data=insert_data, is_binary=is_binary, nb=data_size, + is_flush=is_flush, name=name, active_trace=True)[0] + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors(default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + # create index + default_index = gen_index_param(index_type) + collection_w.create_index(default_search_field, default_index) + # load + collection_w.load() + # search + search_params = gen_search_param(index_type)[0] + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + # query + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/python_client/deploy/testcases/test_action_reinstall.py b/tests/python_client/deploy/testcases/test_action_reinstall.py new file mode 100644 index 0000000000..543fb0906a --- /dev/null +++ b/tests/python_client/deploy/testcases/test_action_reinstall.py @@ -0,0 +1,176 @@ +import pytest +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from common.milvus_sys import MilvusSys +from utils.util_pymilvus import * +from deploy.base import TestDeployBase +from deploy.common import gen_index_param, gen_search_param +from utils.util_log import test_log as log + + +default_nb = ct.default_nb +default_nq = ct.default_nq +default_dim = ct.default_dim +default_limit = ct.default_limit +default_search_field = ct.default_float_vec_field_name +default_search_params = ct.default_search_params +default_int64_field_name = ct.default_int64_field_name +default_float_field_name = ct.default_float_field_name +default_bool_field_name = ct.default_bool_field_name +default_string_field_name = ct.default_string_field_name +binary_field_name = default_binary_vec_field_name +default_search_exp = "int64 >= 0" +default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + +prefix = "test_reinstall" + + +class TestActionBeforeReinstall(TestDeployBase): + """ Test case of action before reinstall """ + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % + method.__name__) + log.info("skip drop collection") + + @pytest.mark.tags(CaseLabel.L3) + def test_task_all_empty(self): + """ + before reinstall: create collection + """ + name = cf.gen_unique_str(prefix) + self.init_collection_general(insert_data=False, name=name)[0] + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("replica_number", [1]) + @pytest.mark.parametrize("is_compacted", ["is_compacted"]) + @pytest.mark.parametrize("is_deleted", ["is_deleted"]) + @pytest.mark.parametrize("is_string_indexed", ["is_string_indexed", "not_string_indexed"]) + @pytest.mark.parametrize("is_vector_indexed", ["is_vector_indexed", "not_vector_indexed"]) + @pytest.mark.parametrize("segment_status", ["only_growing", "only_sealed", "all"]) + @pytest.mark.parametrize("index_type", ["IVF_FLAT"]) #"IVF_FLAT", "HNSW", "BIN_IVF_FLAT" + def test_task_all(self, index_type, is_compacted, + segment_status, is_vector_indexed, is_string_indexed, replica_number, is_deleted, data_size): + """ + before reinstall: create collection and insert data, load and search + """ + name = "" + for k,v in locals().items(): + if k in ["self", "name"]: + continue + name += f"_{k}_{v}" + name = prefix + name + self._connect() + ms = MilvusSys() + if len(ms.query_nodes) < replica_number: + # this step is to make sure this testcase can run on standalone mode + # or cluster mode which has only one querynode + pytest.skip("skip test, not enough nodes") + + log.info(f"collection name: {name}, replica_number: {replica_number}, is_compacted: {is_compacted}," + f"is_deleted: {is_deleted}, is_vector_indexed: {is_vector_indexed}, is_string_indexed: {is_string_indexed}," + f"segment_status: {segment_status}, index_type: {index_type}") + + is_binary = True if "BIN" in index_type else False + + # params for search and query + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors( + default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + else: + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + search_params = gen_search_param(index_type)[0] + + # init collection and insert with small size data without flush to get growing segment + collection_w = self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000, + is_flush=False, is_index=True, name=name)[0] + # load for growing segment + if replica_number >= 1: + try: + collection_w.release() + except Exception as e: + log.error( + f"release collection failed: {e} maybe the collection is not loaded") + collection_w.load(replica_number=replica_number) + + # delete data for growing segment + delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]" + if is_deleted == "is_deleted": + collection_w.delete(expr=delete_expr) + + # search and query for growing segment + if replica_number >= 1: + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) + + # skip subsequent operations when segment_status is set to only_growing + if segment_status == "only_growing": + pytest.skip( + "already get growing segment, skip subsequent operations") + + # insert with flush multiple times to generate multiple sealed segment + for i in range(2): + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size, + is_flush=False, is_index=True, name=name) + collection_w.flush() + + + # params for creating index + if is_binary: + default_index_field = ct.default_binary_vec_field_name + else: + default_index_field = ct.default_float_vec_field_name + + # create index for vector + if is_vector_indexed == "is_vector_indexed": + default_index_param = gen_index_param(index_type) + collection_w.create_index(default_index_field, default_index_param) + + # create index for string + if is_string_indexed == "is_string_indexed": + default_string_index_params = {} + default_string_index_name = "_default_string_idx" + collection_w.create_index( + default_string_field_name, default_string_index_params, index_name=default_string_index_name) + + # delete data for sealed segment + delete_expr = f"{ct.default_int64_field_name} in [10,11,12,13,14,15,16,17,18,19]" + if is_deleted == "is_deleted": + collection_w.delete(expr=delete_expr) + if is_compacted == "is_compacted": + collection_w.compact() + if segment_status == "all": + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000, + is_flush=False, is_index=True, name=name) + # reload after flush and creating index + if replica_number > 0: + collection_w.release() + collection_w.load(replica_number=replica_number) + + # insert data to get growing segment + if segment_status == "all": + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=3000, + is_flush=False, is_index=True, name=name) + + # search and query for sealed and growing segment + if replica_number > 0: + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + output_fields = [ct.default_int64_field_name] + collection_w.query(default_term_expr, output_fields=output_fields, + check_task=CheckTasks.check_query_not_empty) diff --git a/tests/python_client/deploy/testcases/test_all_collections_verification.py b/tests/python_client/deploy/testcases/test_all_collections_verification.py new file mode 100644 index 0000000000..96b96ebe31 --- /dev/null +++ b/tests/python_client/deploy/testcases/test_all_collections_verification.py @@ -0,0 +1,198 @@ +import pytest +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from common.milvus_sys import MilvusSys +from utils.util_pymilvus import * +from deploy.base import TestDeployBase +from deploy.common import gen_index_param, gen_search_param, get_all_collections +from utils.util_log import test_log as log + + +default_nb = ct.default_nb +default_nq = ct.default_nq +default_dim = ct.default_dim +default_limit = ct.default_limit +default_search_field = ct.default_float_vec_field_name +default_search_params = ct.default_search_params +default_int64_field_name = ct.default_int64_field_name +default_float_field_name = ct.default_float_field_name +default_bool_field_name = ct.default_bool_field_name +default_string_field_name = ct.default_string_field_name +binary_field_name = default_binary_vec_field_name +default_search_exp = "int64 >= 0" +default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' + + + + +class TestActionBeforeReinstall(TestDeployBase): + """ Test case of action before reinstall """ + + @pytest.fixture(scope="function", params=get_all_collections()) + def collection_name(self, request): + if request.param == [] or request.param == "": + pytest.skip("The collection name is invalid") + yield request.param + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % + method.__name__) + log.info("skip drop collection") + + @pytest.mark.tags(CaseLabel.L3) + def test_check(self, collection_name, data_size): + """ + before reinstall: create collection + """ + self._connect() + ms = MilvusSys() + name = collection_name + collection_w = self.init_collection_general( + insert_data=False, name=name, active_trace=True)[0] + schema = collection_w.schema + data_type = [field.dtype.name for field in schema.fields] + field_name = [field.name for field in schema.fields] + type_field_map = dict(zip(data_type,field_name)) + is_binary = False + + if "BINARY_VECTOR" in data_type: + is_binary = True + + if is_binary: + default_index_field = ct.default_binary_vec_field_name + vector_index_type = "BIN_FLAT" + else: + default_index_field = ct.default_float_vec_field_name + vector_index_type = "IVF_FLAT" + + is_vector_indexed = False + is_string_indexed = False + indexed_fields = [index.field_name for index in collection_w.indexes] + binary_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("BINARY_VECTOR", "")] + float_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("FLOAT_VECTOR", "")] + string_index_types = [index.params["index_type"] for index in collection_w.indexes if index.field_name == type_field_map.get("VARCHAR", "")] + index_names = [index.index_name for index in collection_w.indexes] # used to drop index + vector_index_types = binary_vector_index_types + float_vector_index_types + if len(vector_index_types) > 0: + is_vector_indexed = True + vector_index_type = vector_index_types[0] + + if len(string_index_types) > 0: + is_string_indexed = True + + try: + replicas, _ = collection_w.get_replicas(enable_traceback=False) + replicas_loaded = len(replicas.groups) + except Exception as e: + log.info("get replicas failed") + replicas_loaded = 0 + # params for search and query + if is_binary: + _, vectors_to_search = cf.gen_binary_vectors( + default_nb, default_dim) + default_search_field = ct.default_binary_vec_field_name + else: + vectors_to_search = cf.gen_vectors(default_nb, default_dim) + default_search_field = ct.default_float_vec_field_name + search_params = gen_search_param(vector_index_type)[0] + + # load if not loaded + if replicas_loaded == 0: + collection_w.load() + + # search and query + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) + + # flush + collection_w.num_entities + + # search and query + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) + + # insert data and flush + for i in range(2): + self.init_collection_general(insert_data=True, is_binary=is_binary, nb=data_size, + is_flush=False, is_index=True, name=name) + collection_w.num_entities + + # delete data + delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]" + collection_w.delete(expr=delete_expr) + + # search and query + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) + + # drop index if exist + if len(index_names) > 0: + for index_name in index_names: + collection_w.drop_index(index_name=index_name) + # search and query after dropping index + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) + + # create index + default_index_param = gen_index_param(vector_index_type) + collection_w.create_index(default_index_field, default_index_param, index_name=cf.gen_unique_str()) + collection_w.create_index(default_string_field_name, {}, index_name=cf.gen_unique_str()) + + # search and query + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) + + # release and reload with changed replicas + collection_w.release() + replica_number = 1 + if replicas_loaded in [0,1] and len(ms.query_nodes)>=2 : + replica_number = 2 + collection_w.load(replica_number=replica_number) + + # search and query + collection_w.search(vectors_to_search[:default_nq], default_search_field, + search_params, default_limit, + default_search_exp, + output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": default_nq, + "limit": default_limit}) + collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name], + check_task=CheckTasks.check_query_not_empty) diff --git a/tests/python_client/deploy/testcases/test_get_all_collections.py b/tests/python_client/deploy/testcases/test_get_all_collections.py new file mode 100644 index 0000000000..3fac68ceef --- /dev/null +++ b/tests/python_client/deploy/testcases/test_get_all_collections.py @@ -0,0 +1,37 @@ +import time +import json +from collections import defaultdict +import pytest + +from base.client_base import TestcaseBase +from common import common_func as cf +from common import common_type as ct +from deploy.common import get_all_collections +from common.common_type import CaseLabel +from utils.util_log import test_log as log + + +class TestGetCollections(TestcaseBase): + """ Test case of end to end""" + + def teardown_method(self, method): + log.info(("*" * 35) + " teardown " + ("*" * 35)) + log.info("[teardown_method] Start teardown test case %s..." % + method.__name__) + + @pytest.mark.tags(CaseLabel.L1) + def test_get_collections_by_prefix(self,): + self._connect() + all_collections = self.utility_wrap.list_collections()[0] + all_collections = [c_name for c_name in all_collections if "test_reinstall" in c_name or "test_upgrade" in c_name] + log.info(f"find {len(all_collections)} collections:") + log.info(all_collections) + data = { + "all": all_collections + } + with open("/tmp/ci_logs/all_collections.json", "w") as f: + f.write(json.dumps(data)) + log.info(f"write {len(all_collections)} collections to /tmp/ci_logs/all_collections.json") + collections_in_json = get_all_collections() + assert len(all_collections) == len(collections_in_json) + diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index 1ec5853547..fefce9114c 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -46,8 +46,8 @@ entity = gen_entities(1, is_normal=True) entities = gen_entities(default_nb, is_normal=True) raw_vectors, binary_entities = gen_binary_entities(default_nb) default_query, _ = gen_search_vectors_params(field_name, entities, default_top_k, nq) -index_name1=cf.gen_unique_str("float") -index_name2=cf.gen_unique_str("varhar") +index_name1 = cf.gen_unique_str("float") +index_name2 = cf.gen_unique_str("varhar") class TestCollectionSearchInvalid(TestcaseBase): diff --git a/tests/python_client/utils/api_request.py b/tests/python_client/utils/api_request.py index 15f8e81b68..53da206ae6 100644 --- a/tests/python_client/utils/api_request.py +++ b/tests/python_client/utils/api_request.py @@ -1,4 +1,5 @@ import traceback +import copy import os from utils.util_log import test_log as log @@ -19,7 +20,10 @@ def api_request_catch(): def wrapper(func): def inner_wrapper(*args, **kwargs): try: - res = func(*args, **kwargs) + _kwargs = copy.deepcopy(kwargs) + if "enable_traceback" in _kwargs: + del _kwargs["enable_traceback"] + res = func(*args, **_kwargs) # if enable_traceback == "True": if kwargs.get("enable_traceback", True): res_str = str(res) diff --git a/tests/python_client/utils/wrapper.py b/tests/python_client/utils/wrapper.py index aa60ba1898..1d4ef97ac3 100644 --- a/tests/python_client/utils/wrapper.py +++ b/tests/python_client/utils/wrapper.py @@ -3,7 +3,7 @@ from datetime import datetime import functools from utils.util_log import test_log as log -DEFAULT_FMT = '[{start_time}][{end_time}][{elapsed:0.8f}s] {collection_name} {func_name} ({arg_str}) -> {result!r}' +DEFAULT_FMT = '[{start_time}] [{elapsed:0.8f}s] {collection_name} {func_name} -> {res!r}' def trace(fmt=DEFAULT_FMT, prefix='test', flag=True): @@ -15,23 +15,23 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True): if flag: start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') t0 = time.perf_counter() - result = func(*args, **kwargs) + res, result = func(*args, **kwargs) elapsed = time.perf_counter() - t0 end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') func_name = func.__name__ collection_name = args[0].collection.name - arg_lst = [repr(arg) for arg in args[1:]][:100] - arg_lst.extend(f'{k}={v!r}' for k, v in kwargs.items()) - arg_str = ', '.join(arg_lst)[:200] + # arg_lst = [repr(arg) for arg in args[1:]][:100] + # arg_lst.extend(f'{k}={v!r}' for k, v in kwargs.items()) + # arg_str = ', '.join(arg_lst)[:200] log_str = f"[{prefix}]" + fmt.format(**locals()) # TODO: add report function in this place, like uploading to influxdb # it is better a async way to do this, in case of blocking the request processing log.info(log_str) - return result + return res, result else: - result = func(*args, **kwargs) - return result + res, result = func(*args, **kwargs) + return res, result return inner_wrapper