import multiprocessing import numbers import random import numpy import threading import pytest import pandas as pd import decimal from decimal import Decimal, getcontext from time import sleep import heapq from base.client_base import TestcaseBase from utils.util_log import test_log as log from common import common_func as cf from common import common_type as ct from common.common_type import CaseLabel, CheckTasks from utils.util_pymilvus import * from common.constants import * from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_SESSION, CONSISTENCY_EVENTUALLY from base.high_level_api_wrapper import HighLevelApiWrapper client_w = HighLevelApiWrapper() prefix = "milvus_client_api_index" epsilon = ct.epsilon default_nb = ct.default_nb default_nb_medium = ct.default_nb_medium default_nq = ct.default_nq default_dim = ct.default_dim default_limit = ct.default_limit default_search_exp = "id >= 0" exp_res = "exp_res" default_search_string_exp = "varchar >= \"0\"" default_search_mix_exp = "int64 >= 0 && varchar >= \"0\"" default_invaild_string_exp = "varchar >= 0" default_json_search_exp = "json_field[\"number\"] >= 0" perfix_expr = 'varchar like "0%"' default_search_field = ct.default_float_vec_field_name default_search_params = ct.default_search_params default_primary_key_field_name = "id" default_vector_field_name = "vector" default_multiple_vector_field_name = "vector_new" default_float_field_name = ct.default_float_field_name default_bool_field_name = ct.default_bool_field_name default_string_field_name = ct.default_string_field_name default_int32_array_field_name = ct.default_int32_array_field_name default_string_array_field_name = ct.default_string_array_field_name class TestMilvusClientIndexInvalid(TestcaseBase): """ Test case of search interface """ @pytest.fixture(scope="function", params=[False, True]) def auto_id(self, request): yield request.param @pytest.fixture(scope="function", params=["COSINE", "L2"]) def metric_type(self, request): yield request.param """ ****************************************************************** # The following are invalid base cases ****************************************************************** """ @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) def test_milvus_client_index_invalid_collection_name(self, name): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector") # 3. create index error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the first character of a collection " f"name must be an underscore or letter: invalid parameter"} client_w.create_index(client, name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("name", ["a".join("a" for i in range(256))]) def test_milvus_client_index_collection_name_over_max_length(self, name): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector") # 3. create index error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the length of a collection name " f"must be less than 255 characters: invalid parameter"} client_w.create_index(client, name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) def test_milvus_client_index_not_exist_collection_name(self): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) not_existed_collection_name = cf.gen_unique_str("not_existed_collection") # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name="vector") # 3. create index error = {ct.err_code: 100, ct.err_msg: f"can't find collection[database=default][collection={not_existed_collection_name}]"} client_w.create_index(client, not_existed_collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip(reason="pymilvus issue 1885") @pytest.mark.parametrize("index", ["12-s", "12 s", "(mn)", "中文", "%$#", "a".join("a" for i in range(256))]) def test_milvus_client_index_invalid_index_type(self, index): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index) # 3. create index error = {ct.err_code: 100, ct.err_msg: f"can't find collection collection not " f"found[database=default][collection=not_existed]"} client_w.create_index(client, collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip(reason="pymilvus issue 1885") @pytest.mark.parametrize("metric", ["12-s", "12 s", "(mn)", "中文", "%$#", "a".join("a" for i in range(256))]) def test_milvus_client_index_invalid_metric_type(self, metric): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", metric_type = metric) # 3. create index error = {ct.err_code: 100, ct.err_msg: f"can't find collection collection not " f"found[database=default][collection=not_existed]"} client_w.create_index(client, collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) def test_milvus_client_index_drop_index_before_release(self): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") error = {ct.err_code: 65535, ct.err_msg: f"index cannot be dropped, collection is loaded, " f"please release it first"} client_w.drop_index(client, collection_name, "vector", check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip(reason="pymilvus issue 1886") def test_milvus_client_index_multiple_indexes_one_field(self): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type="HNSW", metric_type="IP") # 3. create index client_w.create_index(client, collection_name, index_params) # 4. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name="vector", index_type="IVF_FLAT", metric_type="L2") error = {ct.err_code: 1100, ct.err_msg: f""} # 5. create another index client_w.create_index(client, collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip(reason="pymilvus issue 1886") def test_milvus_client_create_diff_index_without_release(self): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: raise exception """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type="HNSW", metric_type="L2") # 3. create index client_w.create_index(client, collection_name, index_params) client_w.drop_collection(client, collection_name) class TestMilvusClientIndexValid(TestcaseBase): """ Test case of search interface """ @pytest.fixture(scope="function", params=[False, True]) def auto_id(self, request): yield request.param @pytest.fixture(scope="function", params=["COSINE", "L2", "IP"]) def metric_type(self, request): yield request.param @pytest.fixture(scope="function", params=["TRIE", "STL_SORT", "AUTOINDEX"]) def scalar_index(self, request): yield request.param """ ****************************************************************** # The following are valid base cases ****************************************************************** """ @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip("https://github.com/milvus-io/pymilvus/issues/1886") @pytest.mark.parametrize("index, params", zip(ct.all_index_types[:7], ct.default_all_indexes_params[:7])) def test_milvus_client_index_default(self, index, params, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name="vector", index_type=index, metric_type=metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. create same index twice client_w.create_index(client, collection_name, index_params) # 5. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 6. load collection client_w.load_collection(client, collection_name) # 7. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 8. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.skip(reason="pymilvus issue 1884") @pytest.mark.parametrize("index, params", zip(ct.all_index_types[:7], ct.default_all_indexes_params[:7])) def test_milvus_client_index_with_params(self, index, params, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index, params=params,metric_type = metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 5. load collection client_w.load_collection(client, collection_name) # 6. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 7. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.skip("wait for modification") @pytest.mark.parametrize("index, params", zip(ct.all_index_types[:7], ct.default_all_indexes_params[:7])) def test_milvus_client_index_after_insert(self, index, params, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") # 2. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 3. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index, metric_type = metric_type) # 4. create index client_w.create_index(client, collection_name, index_params) # 5. load collection client_w.load_collection(client, collection_name) # 5. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 4. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.skip("wait for modification") def test_milvus_client_index_auto_index(self, scalar_index, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index = "AUTOINDEX" index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index, metric_type = metric_type) index_params.add_index(field_name="id", index_type=scalar_index, metric_type=metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. drop index client_w.drop_index(client, collection_name, "vector") client_w.drop_index(client, collection_name, "id") # 5. create index client_w.create_index(client, collection_name, index_params) # 6. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 7. load collection client_w.load_collection(client, collection_name) # 8. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 9. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.skip("wait for modification") def test_milvus_client_index_multiple_vectors(self, scalar_index, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index = "AUTOINDEX" index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index, metric_type = metric_type) index_params.add_index(field_name="id", index_type=scalar_index, metric_type=metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i), default_multiple_vector_field_name: list(rng.random((1, default_dim))[0])} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 5. load collection client_w.load_collection(client, collection_name) # 6. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 7. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.skip("wait for modification") @pytest.mark.parametrize("index, params", zip(ct.all_index_types[:7], ct.default_all_indexes_params[:7])) def test_milvus_client_index_drop_create_same_index(self, index, params, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name = "vector", index_type=index, metric_type = metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. drop index client_w.drop_index(client, collection_name, "vector") # 4. create same index twice client_w.create_index(client, collection_name, index_params) # 5. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 6. load collection client_w.load_collection(client, collection_name) # 7. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 8. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.skip("wait for modification") @pytest.mark.parametrize("index, params", zip(ct.all_index_types[:7], ct.default_all_indexes_params[:7])) def test_milvus_client_index_drop_create_different_index(self, index, params, metric_type): """ target: test search (high level api) normal case method: create connection, collection, insert and search expected: search/query successfully """ client = self._connect(enable_milvus_client_api=True) collection_name = cf.gen_unique_str(prefix) # 1. create collection client_w.create_collection(client, collection_name, default_dim, consistency_level="Strong") client_w.release_collection(client, collection_name) client_w.drop_index(client, collection_name, "vector") res = client_w.list_indexes(client, collection_name)[0] assert res == [] # 2. prepare index params index_params = client_w.prepare_index_params(client)[0] index_params.add_index(field_name="vector", metric_type=metric_type) # 3. create index client_w.create_index(client, collection_name, index_params) # 4. drop index client_w.drop_index(client, collection_name, "vector") # 4. create different index index_params.add_index(field_name="vector", index_type=index, metric_type=metric_type) client_w.create_index(client, collection_name, index_params) # 5. insert rng = np.random.default_rng(seed=19530) rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] client_w.insert(client, collection_name, rows) # 6. load collection client_w.load_collection(client, collection_name) # 7. search vectors_to_search = rng.random((1, default_dim)) insert_ids = [i for i in range(default_nb)] client_w.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, "nq": len(vectors_to_search), "ids": insert_ids, "limit": default_limit}) # 8. query client_w.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, check_items={exp_res: rows, "with_vec": True, "primary_field": default_primary_key_field_name}) client_w.drop_collection(client, collection_name)