import pytest from pymilvus import connections from utils.util_log import test_log as log from base.collection_wrapper import ApiCollectionWrapper from common import common_func as cf from common import common_type as ct # from milvus_operator import * # from common.milvus_sys import MilvusSys # from common.common_type import CaseLabel # from pytest_dependency import depends supported_simd_types = ["sse4_2", "avx", "avx2", "avx512"] # TODO: implement simd config after supported # @pytest.mark.tags(CaseLabel.L3) # class TestSimdCompatibility: # """ # steps # 1. [test_milvus_install]: set up milvus with customized simd configured # 2. [test_simd_compat_e2e]: verify milvus is working well # 4. [test_milvus_cleanup]: clear the env "avx", "avx2", "avx512" # """ # # @pytest.mark.parametrize('simd', [ # pytest.param("sse4_2", marks=pytest.mark.dependency(name='ins_sse4_2')), # # pytest.param("avx", marks=pytest.mark.dependency(name='ins_avx')), # # pytest.param("avx2", marks=pytest.mark.dependency(name='ins_avx2')), # pytest.param("avx512", marks=pytest.mark.dependency(name='ins_avx512')) # ]) # def test_milvus_install(self, request, simd): # release_name = "mil-simd-" + cf.gen_digits_by_length(6) # namespace = 'chaos-testing' # cus_configs = {'spec.components.image': 'milvusdb/milvus-dev:master-latest', # 'metadata.namespace': namespace, # 'metadata.name': release_name, # 'spec.components.proxy.serviceType': 'LoadBalancer', # # TODO: use simd config instead of replicas # 'spec.components.queryNode.replicas': 2 # } # install_milvus(cus_configs, template=template_yaml) # healthy = wait_for_milvus_healthy(release_name, namespace) # log.info(f"milvus healthy: {healthy}") # assert healthy # endpoint = get_milvus_endpoint(release_name, namespace) # log.info(f"milvus endpoint: {endpoint}") # host = endpoint.split(':')[0] # port = endpoint.split(':')[1] # conn = connections.connect(simd, host=host, port=port) # assert conn is not None # mil = MilvusSys(alias=simd) # log.info(f"milvus build version: {mil.build_version}") # # TODO: Verify simd config instead of replicas # assert len(mil.query_nodes) == 2 # # # cache results for dependent tests # cache = {'release_name': release_name, # 'namespace': namespace, # 'alias': simd, # 'simd': simd # } # request.config.cache.set(simd, cache) # # @pytest.mark.parametrize('simd', [ # pytest.param("sse4_2", marks=pytest.mark.dependency(name='e2e_sse4_2', depends=["ins_sse4_2"])), # # pytest.param("avx", marks=pytest.mark.dependency(name='e2e_avx', depends=["ins_avx"])), # # pytest.param("avx2", marks=pytest.mark.dependency(name='e2e_avx2', depends=["ins_avx2"])), # pytest.param("avx512", marks=pytest.mark.dependency(name='e2e_avx512', depends=["ins_avx512"])) # ]) # def test_simd_compat_e2e(self, request, simd): # log.info(f"start to e2e verification: {simd}") # # parse results from previous results # results = request.config.cache.get(simd, None) # alias = results.get('alias', simd) # conn = connections.connect(alias=alias) # assert conn is not None # simd_cache = request.config.cache.get(simd, None) # log.info(f"simd_cache: {simd_cache}") # # create # name = cf.gen_unique_str("compat") # t0 = time.time() # collection_w = ApiCollectionWrapper() # collection_w.init_collection(name=name, # schema=cf.gen_default_collection_schema(), # using=alias, # timeout=40) # tt = time.time() - t0 # assert collection_w.name == name # entities = collection_w.num_entities # log.info(f"assert create collection: {tt}, init_entities: {entities}") # # # insert # data = cf.gen_default_list_data() # t0 = time.time() # _, res = collection_w.insert(data) # tt = time.time() - t0 # log.info(f"assert insert: {tt}") # assert res # # # flush # t0 = time.time() # assert collection_w.num_entities == len(data[0]) + entities # tt = time.time() - t0 # entities = collection_w.num_entities # log.info(f"assert flush: {tt}, entities: {entities}") # # # search # collection_w.load() # search_vectors = cf.gen_vectors(1, ct.default_dim) # search_params = {"metric_type": "L2", "params": {"nprobe": 16}} # t0 = time.time() # res_1, _ = collection_w.search(data=search_vectors, # anns_field=ct.default_float_vec_field_name, # param=search_params, limit=1) # tt = time.time() - t0 # log.info(f"assert search: {tt}") # assert len(res_1) == 1 # collection_w.release() # # # index # d = cf.gen_default_list_data() # collection_w.insert(d) # log.info(f"assert index entities: {collection_w.num_entities}") # _index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"} # t0 = time.time() # index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name, # index_params=_index_params, # name=cf.gen_unique_str()) # tt = time.time() - t0 # log.info(f"assert index: {tt}") # assert len(collection_w.indexes) == 1 # # # search # t0 = time.time() # collection_w.load() # tt = time.time() - t0 # log.info(f"assert load: {tt}") # search_vectors = cf.gen_vectors(1, ct.default_dim) # t0 = time.time() # res_1, _ = collection_w.search(data=search_vectors, # anns_field=ct.default_float_vec_field_name, # param=search_params, limit=1) # tt = time.time() - t0 # log.info(f"assert search: {tt}") # # # query # term_expr = f'{ct.default_int64_field_name} in [1001,1201,4999,2999]' # t0 = time.time() # res, _ = collection_w.query(term_expr) # tt = time.time() - t0 # log.info(f"assert query result {len(res)}: {tt}") # # @pytest.mark.parametrize('simd', [ # pytest.param("sse4_2", marks=pytest.mark.dependency(name='clear_sse4_2', depends=["ins_sse4_2", "e2e_sse4_2"])), # # pytest.param("avx", marks=pytest.mark.dependency(name='clear_avx', depends=["ins_avx", "e2e_avx"])), # # pytest.param("avx2", marks=pytest.mark.dependency(name='clear_avx2', depends=["ins_avx2", "e2e_avx2"])), # pytest.param("avx512", marks=pytest.mark.dependency(name='clear_avx512', depends=["ins_avx512", "e2e_avx512"])) # ]) # def test_milvus_cleanup(self, request, simd): # # get release name from previous results # results = request.config.cache.get(simd, None) # release_name = results.get('release_name', "name-not-found") # namespace = results.get('namespace', "namespace-not-found") # simd_cache = request.config.cache.get(simd, None) # log.info(f"stat to cleanup: {simd}") # log.info(f"simd_cache: {simd_cache}") # log.info(f"release_name: {release_name}") # log.info(f"namespace: {namespace}") # # uninstall_milvus(release_name, namespace)