diff --git a/tests/python_client/base/client_base.py b/tests/python_client/base/client_base.py index 68d4612b29..0ae50e9aa2 100644 --- a/tests/python_client/base/client_base.py +++ b/tests/python_client/base/client_base.py @@ -173,7 +173,6 @@ class TestcaseBase(Base): log.info(f"server version: {server_version}") return res - def get_tokens_by_analyzer(self, text, analyzer_params): if cf.param_info.param_uri: uri = cf.param_info.param_uri diff --git a/tests/python_client/base/client_v2_base.py b/tests/python_client/base/client_v2_base.py index d1fddaa6af..f53e9332b7 100644 --- a/tests/python_client/base/client_v2_base.py +++ b/tests/python_client/base/client_v2_base.py @@ -175,19 +175,6 @@ class TestMilvusClientV2Base(Base): **kwargs).run() return res, check_result - @trace() - def hybrid_search(self, client, collection_name, reqs, ranker, limit=10, output_fields=None, timeout=None, - check_task=None, check_items=None, **kwargs): - timeout = TIMEOUT if timeout is None else timeout - kwargs.update({"timeout": timeout}) - - func_name = sys._getframe().f_code.co_name - res, check = api_request([client.hybrid_search, collection_name, reqs, ranker, limit, - output_fields], **kwargs) - check_result = ResponseChecker(res, func_name, check_task, check_items, check, - collection_name=collection_name, reqs=reqs, ranker=ranker, limit=limit, - output_fields=output_fields, **kwargs).run() - return res, check_result @trace() def search_iterator(self, client, collection_name, data, batch_size, limit=-1, filter=None, output_fields=None, @@ -210,16 +197,16 @@ class TestMilvusClientV2Base(Base): return res, check_result @trace() - def hybrid_search(self, client, collection_name, reqs, rerank, limit=10, + def hybrid_search(self, client, collection_name, reqs, ranker, limit=10, output_fields=None, timeout=None, partition_names=None, check_task=None, check_items=None, **kwargs): timeout = TIMEOUT if timeout is None else timeout # kwargs.update({"timeout": timeout}) func_name = sys._getframe().f_code.co_name - res, check = api_request([client.hybrid_search, collection_name, reqs, rerank, limit, + res, check = api_request([client.hybrid_search, collection_name, reqs, ranker, limit, output_fields, timeout, partition_names], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, - collection_name=collection_name, reqs=reqs, rerank=rerank, limit=limit, + collection_name=collection_name, reqs=reqs, ranker=ranker, limit=limit, output_fields=output_fields, timeout=timeout, partition_names=partition_names, **kwargs).run() return res, check_result @@ -332,15 +319,6 @@ class TestMilvusClientV2Base(Base): self.tear_down_collection_names.remove(collection_name) return res, check_result - @trace() - def list_partitions(self, client, collection_name, check_task=None, check_items=None, **kwargs): - func_name = sys._getframe().f_code.co_name - res, check = api_request([client.list_partitions, collection_name], **kwargs) - check_result = ResponseChecker(res, func_name, check_task, check_items, check, - collection_name=collection_name, - **kwargs).run() - return res, check_result - @trace() def list_indexes(self, client, collection_name, field_name=None, check_task=None, check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name @@ -359,16 +337,6 @@ class TestMilvusClientV2Base(Base): **kwargs).run() return res, check_result - @trace() - def prepare_index_params(self, client, timeout=None, check_task=None, check_items=None, **kwargs): - timeout = TIMEOUT if timeout is None else timeout - kwargs.update({"timeout": timeout}) - - func_name = sys._getframe().f_code.co_name - res, check = api_request([client.prepare_index_params], **kwargs) - check_result = ResponseChecker(res, func_name, check_task, check_items, check, - **kwargs).run() - return res, check_result @trace() def load_collection(self, client, collection_name, timeout=None, check_task=None, check_items=None, **kwargs): @@ -803,19 +771,6 @@ class TestMilvusClientV2Base(Base): object_name=object_name, db_name=db_name, **kwargs).run() return res, check_result - @trace() - def grant_privilege_v2(self, client, role_name, privilege, collection_name, db_name=None, - timeout=None, check_task=None, check_items=None, **kwargs): - timeout = TIMEOUT if timeout is None else timeout - kwargs.update({"timeout": timeout}) - func_name = sys._getframe().f_code.co_name - res, check = api_request([client.grant_privilege_v2, role_name, privilege, collection_name, - db_name], **kwargs) - check_result = ResponseChecker(res, func_name, check_task, check_items, check, - role_name=role_name, privilege=privilege, - collection_name=collection_name, db_name=db_name, **kwargs).run() - return res, check_result - @trace() def revoke_privilege(self, client, role_name, object_type, privilege, object_name, db_name="", timeout=None, check_task=None, check_items=None, **kwargs): @@ -1082,7 +1037,7 @@ class TestMilvusClientV2Base(Base): kwargs.update({"timeout": timeout}) func_name = sys._getframe().f_code.co_name - res, check = api_request([client.update_resource_groups, name], **kwargs) + res, check = api_request([client.drop_resource_group, name], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, check, name=name, **kwargs).run() return res, check_result diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 66d3bf3660..4bf3b40ad1 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -454,8 +454,8 @@ class ResponseChecker: else: ids = list(hits.ids) distances = list(hits.distances) - if (len(hits) != check_items["limit"]) \ - or (len(ids) != check_items["limit"]): + if check_items.get("limit", None) is not None \ + and ((len(hits) != check_items["limit"]) or (len(ids) != check_items["limit"])): log.error("search_results_check: limit(topK) searched (%d) " "is not equal with expected (%d)" % (len(hits), check_items["limit"])) diff --git a/tests/python_client/check/param_check.py b/tests/python_client/check/param_check.py index cb82880399..1a74a34aa9 100644 --- a/tests/python_client/check/param_check.py +++ b/tests/python_client/check/param_check.py @@ -431,20 +431,23 @@ def output_field_value_check(search_res, original, pk_name): :return: True or False """ pk_name = ct.default_primary_field_name if pk_name is None else pk_name + nq = len(search_res) limit = len(search_res[0]) - for i in range(limit): - entity = search_res[0][i].fields - _id = search_res[0][i].id - for field in entity.keys(): - if isinstance(entity[field], list): - for order in range(0, len(entity[field]), 4): - assert abs(original[field][_id][order] - entity[field][order]) < ct.epsilon - elif isinstance(entity[field], dict) and field != ct.default_json_field_name: - # sparse checking, sparse vector must be the last, this is a bit hacky, - # but sparse only supports list data type insertion for now - assert entity[field].keys() == original[-1][_id].keys() - else: - num = original[original[pk_name] == _id].index.to_list()[0] - assert original[field][num] == entity[field] + check_nqs = min(2, nq) # the output field values are wrong only at nq>=2 #45338 + for n in range(check_nqs): + for i in range(limit): + entity = search_res[n][i].fields + _id = search_res[n][i].id + for field in entity.keys(): + if isinstance(entity[field], list): + for order in range(0, len(entity[field]), 4): + assert abs(original[field][_id][order] - entity[field][order]) < ct.epsilon + elif isinstance(entity[field], dict) and field != ct.default_json_field_name: + # sparse checking, sparse vector must be the last, this is a bit hacky, + # but sparse only supports list data type insertion for now + assert entity[field].keys() == original[-1][_id].keys() + else: + num = original[original[pk_name] == _id].index.to_list()[0] + assert original[field][num] == entity[field], f"the output field values are wrong at nq={n}" return True diff --git a/tests/python_client/milvus_client_v2/test_milvus_client_hybrid_search_v2.py b/tests/python_client/milvus_client_v2/test_milvus_client_hybrid_search_v2.py index 073ea65b7f..949a8b500c 100644 --- a/tests/python_client/milvus_client_v2/test_milvus_client_hybrid_search_v2.py +++ b/tests/python_client/milvus_client_v2/test_milvus_client_hybrid_search_v2.py @@ -3,25 +3,19 @@ from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTE from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker from pymilvus import ( FieldSchema, CollectionSchema, DataType, - Collection + Collection, Function, FunctionType, FunctionScore ) -from common.constants import * + from utils.util_pymilvus import * from common.common_type import CaseLabel, CheckTasks from common import common_type as ct from common import common_func as cf from utils.util_log import test_log as log from base.client_base import TestcaseBase -import heapq -from time import sleep -from decimal import Decimal, getcontext -import decimal -import multiprocessing -import numbers +from base.client_v2_base import TestMilvusClientV2Base + import random import math -import numpy -import threading import pytest import pandas as pd from faker import Faker @@ -67,18 +61,894 @@ vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_ uid = "test_search" nq = 1 epsilon = 0.001 -field_name = default_float_vec_field_name binary_field_name = default_binary_vec_field_name search_param = {"nprobe": 1} entity = gen_entities(1, is_normal=True) entities = gen_entities(default_nb, is_normal=True) raw_vectors, binary_entities = gen_binary_entities(default_nb) -default_query, _ = gen_search_vectors_params(field_name, entities, default_top_k, nq) index_name1 = cf.gen_unique_str("float") index_name2 = cf.gen_unique_str("varhar") half_nb = ct.default_nb // 2 max_hybrid_search_req_num = ct.max_hybrid_search_req_num +# test parameters for test client v2 base class +default_primary_key_field_name = "id" +default_vector_field_name = "vector" +partition_names = ["partition_1", "partition_2"] +float_vector_field_name1 = "float_vector1" +float_vector_field_name2 = "float_vector2" +sparse_vector_field_name1 = "text_sparse_emb1" +sparse_vector_field_name2 = "text_sparse_emb2" +max_nq = 16384 + + +@pytest.mark.xdist_group("TestMilvusClientHybridSearch") +class TestMilvusClientHybridSearch(TestMilvusClientV2Base): + """Test search with hybrid search functionality""" + + def setup_class(self): + super().setup_class(self) + self.collection_name = "TestMilvusClientHybridSearch" + cf.gen_unique_str("_") + self.partition_names = partition_names + self.float_vector_field_name1 = float_vector_field_name1 + self.float_vector_field_name2 = float_vector_field_name2 + self.sparse_vector_field_name1 = sparse_vector_field_name1 + self.sparse_vector_field_name2 = sparse_vector_field_name2 + self.float_vector_dim = 128 + self.primary_keys = [] + self.enable_dynamic_field = False + self.datas = [] + + @pytest.fixture(scope="class", autouse=True) + def prepare_collection(self, request): + """ + Initialize collection before test class runs + """ + # Get client connection + client = self._client() + analyzer_params = { + "tokenizer": "standard", + } + + # Create collection + collection_schema = self.create_schema(client)[0] + collection_schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) + collection_schema.add_field(self.float_vector_field_name1, DataType.FLOAT_VECTOR, dim=self.float_vector_dim) + collection_schema.add_field(self.float_vector_field_name2, DataType.FLOAT_VECTOR, dim=self.float_vector_dim) + collection_schema.add_field(self.sparse_vector_field_name1, DataType.SPARSE_FLOAT_VECTOR) + collection_schema.add_field(self.sparse_vector_field_name2, DataType.SPARSE_FLOAT_VECTOR) + collection_schema.add_field('text1', DataType.VARCHAR, max_length=6553, + enable_analyzer=True, analyzer_params=analyzer_params) + collection_schema.add_field('text2', DataType.VARCHAR, max_length=6553, + enable_analyzer=True, analyzer_params=analyzer_params) + collection_schema.add_field(default_int64_field_name, DataType.INT64) + collection_schema.add_field('json', DataType.JSON) + collection_schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=256) + bm25_function1 = Function( + name=self.sparse_vector_field_name1, + function_type=FunctionType.BM25, + input_field_names=["text1"], + output_field_names=self.sparse_vector_field_name1, + params={}, + ) + bm25_function2 = Function( + name=self.sparse_vector_field_name2, + function_type=FunctionType.BM25, + input_field_names=["text2"], + output_field_names=self.sparse_vector_field_name2, + params={}, + ) + collection_schema.add_function(bm25_function1) + collection_schema.add_function(bm25_function2) + self.create_collection(client, self.collection_name, schema=collection_schema, + enable_dynamic_field=self.enable_dynamic_field, force_teardown=False) + for partition_name in self.partition_names: + self.create_partition(client, self.collection_name, partition_name=partition_name) + + # Define number of insert iterations + insert_times = 2 + + # Generate vectors for each type and store in self + float_vectors = cf.gen_vectors(default_nb * insert_times, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + float_vectors2 = cf.gen_vectors(default_nb * insert_times, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + texts1 = cf.gen_varchar_data(length=10, nb=default_nb * insert_times, text_mode=True) + texts2 = cf.gen_varchar_data(length=10, nb=default_nb * insert_times, text_mode=True) + + # Insert data multiple times with non-duplicated primary keys + for j in range(insert_times): + # Group rows by partition based on primary key mod 3 + default_rows = [] + partition1_rows = [] + partition2_rows = [] + + for i in range(default_nb): + pk = i + j * default_nb + row = { + default_primary_key_field_name: pk, + self.float_vector_field_name1: list(float_vectors[pk]), + self.float_vector_field_name2: list(float_vectors2[pk]), + "text1": texts1[pk], + "text2": texts2[pk], + "json": {"float": pk * 1.0, "str": str(pk)}, + default_string_field_name: str(pk), + default_int64_field_name: pk + } + self.datas.append(row) + + # Distribute to partitions based on pk mod 3 + if pk % 3 == 0: + default_rows.append(row) + elif pk % 3 == 1: + partition1_rows.append(row) + else: + partition2_rows.append(row) + + # Insert into respective partitions + if default_rows: + self.insert(client, self.collection_name, data=default_rows) + if partition1_rows: + self.insert(client, self.collection_name, data=partition1_rows, partition_name=self.partition_names[0]) + if partition2_rows: + self.insert(client, self.collection_name, data=partition2_rows, partition_name=self.partition_names[1]) + + # Track all inserted data and primary keys + self.primary_keys.extend([i + j * default_nb for i in range(default_nb)]) + + self.flush(client, self.collection_name) + + # Create index + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=self.float_vector_field_name1, + metric_type="COSINE", + index_type="IVF_FLAT", + params={"nlist": 128}) + index_params.add_index(field_name=self.float_vector_field_name2, + metric_type="L2", + index_type="HNSW", + params={}) + index_params.add_index(field_name=self.sparse_vector_field_name1, + metric_type="BM25", + index_type="SPARSE_INVERTED_INDEX", + params={}) + index_params.add_index(field_name=self.sparse_vector_field_name2, + metric_type="BM25", + index_type="SPARSE_INVERTED_INDEX", + params={}) + self.create_index(client, self.collection_name, index_params=index_params, timeout=300) + + # Load collection + self.load_collection(client, self.collection_name) + + def teardown(): + self.drop_collection(self._client(), self.collection_name) + + request.addfinalizer(teardown) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("nq", [1, 5]) + @pytest.mark.parametrize("vector_data_type", [DataType.FLOAT_VECTOR, DataType.SPARSE_FLOAT_VECTOR]) + def test_hybrid_search_default_with_nqs_and_offset(self, nq, vector_data_type): + """ + Test hybrid search functionality with multiple search requests and offset parameter. + Steps: + - Create connection, set up collection with multiple vector fields. + - Insert records. + - Perform hybrid search with varying nq values and offset settings. + Expected: + - Hybrid search returns results correctly that match the limit and offset parameters. + """ + client = self._client() + + if vector_data_type == DataType.FLOAT_VECTOR: + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=vector_data_type) + field_names = [self.float_vector_field_name1, self.float_vector_field_name2] + else: + field_names = [self.sparse_vector_field_name1, self.sparse_vector_field_name2] + search_data = cf.gen_varchar_data(length=10, nb=nq, text_mode=True) + + # generate hybrid search request list + req_list = [] + for field_name in field_names: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + }) + req_list.append(req) + + # perform hybrid search + self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=WeightedRanker(*[0.6, 0.4]), + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]}) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("req_limit_ratio", [1, 2]) + def test_hybrid_search_different_dim_and_with_full_text_search(self, req_limit_ratio): + """ + Test hybrid search functionality combining different dimension vector fields and full text search. + Steps: + - Create connection, set up collection with multiple vector fields of different dims. + - Insert records. + - Perform hybrid search using topK limit along with a full text filter. + Expected: + - Hybrid search returns results correctly that match the filter and honor the topK limit. + """ + client = self._client() + + nq = 3 + req_list = [] + for field_name in [self.float_vector_field_name1, self.sparse_vector_field_name2]: + if field_name == self.float_vector_field_name1: + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + else: + search_data = cf.gen_varchar_data(length=10, nb=nq, text_mode=True) + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit * req_limit_ratio, + }) + req_list.append(req) + hybrid_search_0 = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=WeightedRanker(*[0.6, 0.4]), + limit=default_limit, + filter=f"{default_int64_field_name} > 1000", + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "enable_milvus_client_api": True, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + + hybrid_search_1 = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=WeightedRanker(*[0.6, 0.4]), + limit=default_limit, + filter=f"{default_int64_field_name} > 1000", + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "enable_milvus_client_api": True, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + # verify the hybrid search results are consistent + for i in range(nq): + assert hybrid_search_0[i].ids == hybrid_search_1[i].ids + + @pytest.mark.tags(CaseLabel.L2) + def test_hybrid_search_with_expr(self): + """ + Test hybrid search functionality with expression filter. + Steps: + - Create connection, set up collection with multiple vector fields. + - Insert records. + - Perform hybrid search using expression filter along with a topK limit. + Expected: + - Hybrid search returns results correctly that match the filter and honor the topK limit. + """ + client = self._client() + + nq = 2 + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + # generate hybrid search request list + req_list = [] + filter_min_value = 800 + filter_max_value = 1800 + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + "expr": f"{filter_min_value} < {default_primary_key_field_name} <= {filter_max_value}" + }) + req_list.append(req) + + ranker = WeightedRanker(*[0.5, 0.5]) + res = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=ranker, + limit=default_limit, + # fitler=f"{default_primary_key_field_name} <= {filter_max_value}", + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + + # verify the hybrid search results meet the filter + for i in range(nq): + assert max(res[i].ids) <= filter_max_value + assert min(res[i].ids) > filter_min_value + + # hybrid search again with filter + filter_max_value2 = np.mean(res[0].ids) + filter = f"{default_primary_key_field_name} <= {filter_max_value2}" + res2 = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=ranker, + limit=default_limit, + fitler=filter, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + # verify filter in hybrid search is not effective + assert max(res2[i].ids) > filter_max_value2 + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("req_num", [1, 5, ct.max_hybrid_search_req_num, ct.max_hybrid_search_req_num + 1]) + def test_hybrid_search_on_same_anns_field(self, req_num): + """ + Test hybrid search functionality on the same anns field. + Steps: + - Create connection, set up collection with multiple vector fields. + - Insert records with different anns fields. + - Perform hybrid search on the same anns field with different expressions. + Expected: + - Hybrid search returns results correctly that match the expressions and honor the topK limit. + """ + nq = 3 + client = self._client() + + # generate hybrid search request list + req_list = [] + for _ in range(req_num): + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": self.float_vector_field_name1, # on the same anns field + "param": {}, + "limit": default_limit, + "expr": f"{default_int64_field_name} > 100" + }) + req_list.append(req) + + ranker = RRFRanker() + + if req_num > ct.max_hybrid_search_req_num: + check_task = CheckTasks.err_res + check_items = {"err_code": 65535, + "err_msg": "maximum of ann search requests is 1024"} + else: + check_task = CheckTasks.check_search_results + check_items = {"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, default_string_field_name]} + self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=ranker, + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=check_task, + check_items=check_items) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("req_limit", [None, 1, ct.default_limit * 2, max_limit]) + def test_hybrid_search_diff_limits_in_search_req(self, req_limit): + """ + Test case: Hybrid search where individual search requests omit 'limit' + Scenario: + - Create connection, collection, and insert data. + - Perform hybrid search with search requests with different 'limit' parameters. + Expected: + - Hybrid search completes successfully and returns results up to the specified topK limit. + """ + client = self._client() + + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + # generate hybrid search request list + req_list = [] + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": req_limit + }) + req_list.append(req) + + ranker = WeightedRanker(*[0.5, 0.5]) + expected_limit = ct.default_limit if req_limit is None else min(req_limit * len(req_list), ct.default_limit) + self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=ranker, + limit=ct.default_limit, + fitler=f"{default_int64_field_name} <= 18000", + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": expected_limit, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]}) + + @pytest.mark.tags(CaseLabel.L1) + def test_hybrid_search_as_search(self): + """ + target: test hybrid search to search as the original search interface + method: create connection, collection, insert and search + expected: hybrid search on one vector field with limit(topK), and the result should be equal to search + on the same vector field with the same params. + """ + # 1. initialize collection with data + client = self._client() + + nq = 3 + for field_name in [self.float_vector_field_name1, self.sparse_vector_field_name1]: + if field_name == self.float_vector_field_name1: + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + else: + search_data = cf.gen_varchar_data(length=10, nb=nq, text_mode=True) + req_list = [] + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + }) + req_list.append(req) + # hybrid search + hybrid_res = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=WeightedRanker(1), + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "enable_milvus_client_api": True, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + search_res = self.search(client, self.collection_name, data=search_data, + anns_field=field_name, + search_params={}, + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "enable_milvus_client_api": True, + "pk_name": default_primary_key_field_name, + "original_entities": self.datas, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + for i in range(nq): + assert hybrid_res[i].ids == search_res[i].ids + + @pytest.mark.tags(CaseLabel.L1) + def test_hybrid_search_RRFRanker_default_parameter(self): + """ + Test hybrid_search with the default RRFRanker configuration. + + This test: + - Connects to the collection and prepares two different vector search requests. + - Performs a standard search on each vector field to build reference scoring. + - Merges the results using the RRFRanker (default parameters) for hybrid search. + - Compares the hybrid search scores with the expected RRFRanker baseline (ignoring IDs, since matches can have equal scores and non-deterministic IDs). + - Verifies repeated hybrid searches with the same parameters produce consistent results. + + The test passes if hybrid search completes successfully and the scores match the manually computed baseline. + """ + client = self._client() + vector_name_list = [self.float_vector_field_name1, self.float_vector_field_name2] + # 3. prepare search params for each vector field + req_list = [] + search_res_dict_array = [] + for field_name in vector_name_list: + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + search_res_dict = {} + search_param = { + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + "expr": f"{default_int64_field_name} > 0"} + req = AnnSearchRequest(**search_param) + req_list.append(req) + # search for get the baseline of hybrid_search + search_res = self.search(client, self.collection_name, data=search_data, + anns_field=field_name, + search_params={}, + limit=default_limit, + filter=f"{default_int64_field_name} > 0", + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name})[0] + ids = search_res[0].ids + for j in range(len(ids)): + search_res_dict[ids[j]] = 1 / (j + 60 + 1) + search_res_dict_array.append(search_res_dict) + # 4. calculate hybrid search baseline for RRFRanker + ids_answer, score_answer = cf.get_hybrid_search_base_results_rrf(search_res_dict_array) + # 5. hybrid search + hybrid_search_0 = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=RRFRanker(), + limit=default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name})[0] + # 6. compare results through the re-calculated distances + for i in range(len(score_answer[:default_limit])): + assert score_answer[i] - hybrid_search_0[0].distances[i] < hybrid_search_epsilon + # 7. run hybrid search with the same parameters twice, and compare the results + hybrid_search_1 = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=RRFRanker(), + limit=default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name})[0] + + assert hybrid_search_0[0].ids == hybrid_search_1[0].ids + assert hybrid_search_0[0].distances == hybrid_search_1[0].distances + + @pytest.mark.tags(CaseLabel.L2) + def test_hybrid_search_overall_limit_larger_sum_each_limit(self): + """ + Test hybrid search functionality with overall limit larger than sum of each limit. + Steps: + - Create connection, set up collection with multiple vector fields. + - Insert records. + - Perform hybrid search using overall limit larger than sum of each limit. + Expected: + - Hybrid search returns results correctly that match the overall limit and honor the topK limit. + + """ + client = self._client() + nq = 3 + limit = 10 + search_data1 = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + search_data2 = cf.gen_varchar_data(length=10, nb=nq, text_mode=True) + vector_filed_names = [self.float_vector_field_name1, self.sparse_vector_field_name2] + search_data_list = [search_data1, search_data2] + id_list_nq = [] + for i in range(nq): + id_list_nq.append([]) + # search the data1 and data2 separately + for i in range(len(vector_filed_names)): + search_res = self.search(client, self.collection_name, data=search_data_list[i], + anns_field=vector_filed_names[i], + search_params={}, + limit=limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, + "ids": self.primary_keys, + "limit": limit, + "pk_name": default_primary_key_field_name, + "output_fields": [default_primary_key_field_name, + default_string_field_name]})[0] + for j in range(nq): + id_list_nq[j].extend(search_res[j].ids) + + # generate hybrid search request list + req_list = [] + for i in range(len(vector_filed_names)): + req = AnnSearchRequest(**{ + "data": search_data_list[i], + "anns_field": vector_filed_names[i], + "param": {}, + "limit": limit, + }) + req_list.append(req) + ranker = WeightedRanker(*[0.6, 0.4]) + # hybrid search + larger_limit = limit * len(req_list) + 1 + hybrid_search_res = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=ranker, + limit=larger_limit, + output_fields=[default_primary_key_field_name, + default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq})[0] + # verify the hybrid search results are consistent + for i in range(nq): + assert len(hybrid_search_res[i].ids) == len(list(set(id_list_nq[i]))) + + # @pytest.mark.tags(CaseLabel.L2) + # def test_hybrid_search_with_range_search(self): + # """ + # target: test hybrid search with range search + # method: + # expected: raise exception (not support yet) + # """ + # client = self._client() + # nq = 2 + # limit = 200 + # + # field_names = [self.sparse_vector_field_name1, self.sparse_vector_field_name2] + # search_data = cf.gen_varchar_data(length=10, nb=nq, text_mode=True) + # + # # 0. search + # mid_distances = [] + # for i in range(len(field_names)): + # field_name = field_names[i] + # res_search = self.search(client, self.collection_name, data=search_data, + # anns_field=field_name, + # limit=limit)[0] + # field_mid_distances = [] + # for j in range(nq): + # field_mid_distances.append(res_search[j].distances[limit // 2 - 1]) + # mid_distances.append(np.mean(field_mid_distances)) + # + # # 1. hybrid search without range search + # req_list = [] + # for field_name in field_names: + # req = AnnSearchRequest(**{ + # "data": search_data, + # "anns_field": field_name, + # "param": {}, + # "limit": limit, + # }) + # req_list.append(req) + # res1 = self.hybrid_search(client, self.collection_name, reqs=req_list, + # ranker=WeightedRanker(0.5, 0.5), + # limit=limit, + # output_fields=[default_primary_key_field_name, default_string_field_name], + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, "ids": self.primary_keys, "limit": limit, + # "pk_name": default_primary_key_field_name, + # "output_fields": [default_primary_key_field_name, + # default_string_field_name]})[0] + # + # # 2. hybrid search with range search one nq by one nq + # for i in range(nq): + # req_list2 = [] + # for j in range(len(field_names)): + # field_name = field_names[j] + # req = AnnSearchRequest(**{ + # "data": [search_data[i]], + # "anns_field": field_name, + # "param": {"params": {"radius": float(mid_distances[i]), "range_filter": 9999}}, + # "limit": limit//2, + # }) + # req_list2.append(req) + # res2 = self.hybrid_search(client, self.collection_name, reqs=req_list2, + # ranker=WeightedRanker(0.5, 0.5), + # limit=limit//2, + # output_fields=[default_primary_key_field_name, default_string_field_name], + # check_task=CheckTasks.check_search_results, + # check_items={"nq": 1, "ids": self.primary_keys, "limit": limit//2, + # "pk_name": default_primary_key_field_name, + # "output_fields": [default_primary_key_field_name, + # default_string_field_name]})[0] + # assert len(set(res2[0].ids).intersection(set(res1[0].ids[:limit//2]))) / len(res2[0].ids) >= 0.7, f"failed in nq={i}" + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("offset", [1, 5]) + @pytest.mark.parametrize("rerank", [RRFRanker(), WeightedRanker(0.1, 0.9)]) + def test_hybrid_search_offset_inside_outside_params(self, offset, rerank): + """ + target: test hybrid search with offset inside and outside params + method: create connection, collection, insert and search. + Note: here the result check is through comparing the score, the ids could not be compared + because the high probability of the same score, then the id is not fixed in the range of + the same score + expected: hybrid search successfully with limit(topK), and the result should be the same + """ + client = self._client() + nq = 1 + req_list = [] + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {"offset": offset}, + "limit": ct.default_limit, + }) + req_list.append(req) + hybrid_res_inside = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=rerank, + limit=ct.default_limit, + output_fields=[default_primary_key_field_name, + default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq})[0] + req_list = [] + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": ct.default_limit + }) + req_list.append(req) + hybrid_res_outside = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=rerank, + limit=ct.default_limit, + offset=offset, + output_fields=[default_primary_key_field_name, + default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq})[0] + hybrid_res_no_offset = self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=rerank, + limit=ct.default_limit, + output_fields=[default_primary_key_field_name, + default_string_field_name], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq})[0] + for i in range(nq): + assert hybrid_res_inside[i].ids[offset:] == \ + hybrid_res_outside[i].ids[:-offset] == \ + hybrid_res_no_offset[i].ids[offset:] + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("ranker", [WeightedRanker(*[0.5, 0.5]), RRFRanker()]) + def test_hybrid_search_empty_reqs(self, ranker): + """ + Test case: Hybrid search with empty reqs + Scenario: + - Create a collection, insert data, and perform hybrid search with an empty request list. + Expected: + - Hybrid search failed with error message + """ + client = self._client() + err_msg = {"err_code": 65535, + "err_msg": "nq [0] is invalid, nq (number of search vector per search request) " + "should be in range [1, 16384], but got 0"} + self.hybrid_search(client, self.collection_name, + reqs=[], + ranker=ranker, + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.err_res, + check_items=err_msg) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("ranker_param", [[0.1, 2], [0.2, 0.4, 0.8]]) + def test_hybrid_search_invalid_WeightedRanker_params(self, ranker_param): + """ + Test case: Hybrid search with invalid WeightedRanker parameters + Scenario: + - Create a collection, insert data, and perform hybrid search with invalid WeightedRanker parameters. + Expected: + - Hybrid search failed with error message + """ + client = self._client() + req_list = [] + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": ct.default_limit, + }) + req_list.append(req) + + err_msg = {"err_code": 999, "err_msg": "rank param weight should be in range [0, 1]"} + if ranker_param == [0.2, 0.4, 0.8]: + err_msg = {"err_code": 999, + "err_msg": "the length of weights param mismatch with ann search requests: " + "invalid parameter[expected=2][actual=3]"} + + ranker = WeightedRanker(*ranker_param) + self.hybrid_search(client, self.collection_name, + reqs=req_list, + ranker=ranker, + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.err_res, + check_items=err_msg) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("k", [0, 16385]) + def test_hybrid_search_RRFRanker_k_out_of_range(self, k): + """ + Test case: Hybrid search with RRFRanker and k out of range + Scenario: + - Create a collection, insert data, and perform hybrid search with RRFRanker and k out of range. + Expected: + - Hybrid search failed with error message + """ + client = self._client() + + req_list = [] + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + }) + req_list.append(req) + + ranker = RRFRanker(k) + # TODO: #29867, the error msg is not good enough, but as it is for now. + err_msg = {"err_code": 65535, + "err_msg": "The rank params k should be in range (0, 16384)"} + self.hybrid_search(client, self.collection_name, + reqs=req_list, + ranker=ranker, + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=CheckTasks.err_res, + check_items=err_msg) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("nq", [max_nq, max_nq + 1]) + def test_hybrid_search_max_nq(self, nq): + """ + Test case: Hybrid search with valid and boundary nq values + Scenario: + - Create a collection, insert data, and perform hybrid search using different nq values (max allowed and just above max). + Expected: + - For valid nq, hybrid search should return the correct results limited by topK. + - For nq above the supported limit, the appropriate error is returned. + """ + client = self._client() + + search_data = cf.gen_vectors(nq, self.float_vector_dim, vector_data_type=DataType.FLOAT_VECTOR) + # generate hybrid search request list + req_list = [] + for field_name in [self.float_vector_field_name1, self.float_vector_field_name2]: + req = AnnSearchRequest(**{ + "data": search_data, + "anns_field": field_name, + "param": {}, + "limit": default_limit, + }) + req_list.append(req) + + if nq == max_nq + 1: + check_task = CheckTasks.err_res + check_items = {"err_code": 65535, + "err_msg": "nq (number of search vector per search request) should be in range [1, 16384]"} + else: + check_task = CheckTasks.check_search_results + check_items = {"nq": nq, + "ids": self.primary_keys, + "limit": default_limit, + "pk_name": default_primary_key_field_name, + "output_fields": [default_primary_key_field_name, default_string_field_name]} + self.hybrid_search(client, self.collection_name, reqs=req_list, + ranker=WeightedRanker(*[0.6, 0.4]), + limit=default_limit, + output_fields=[default_primary_key_field_name, default_string_field_name], + check_task=check_task, + check_items=check_items) + class TestCollectionHybridSearchValid(TestcaseBase): """ Test case of search interface """ @@ -134,15 +1004,16 @@ class TestCollectionHybridSearchValid(TestcaseBase): """ @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("offset", [0, 5]) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_normal(self, nq, is_flush, offset, primary_field, vector_data_type): + def test_hybrid_search_normal(self, is_flush, primary_field, vector_data_type): """ target: test hybrid search normal case method: create connection, collection, insert and search expected: hybrid search successfully with limit(topK) """ self._connect() + nq = 2 + offset = 5 # create db db_name = cf.gen_unique_str(prefix) self.database_wrap.create_database(db_name) @@ -227,212 +1098,9 @@ class TestCollectionHybridSearchValid(TestcaseBase): collection_w.drop() self.database_wrap.drop_database(db_name) - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("nq", [16384]) - def test_hybrid_search_normal_max_nq(self, nq): - """ - target: test hybrid search normal case - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params - req_list = [] - weights = [1] - vectors = cf.gen_vectors(nq, default_dim, vector_data_type=DataType.FLOAT_VECTOR) - # 4. get hybrid search req list - for i in range(len(vector_name_list)): - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 5. hybrid search - hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - - @pytest.mark.tags(CaseLabel.L1) - def test_hybrid_search_normal_expr(self): - """ - target: test hybrid search normal case - method: create connection, collection, insert and search - expected: hybrid search successfully with search param templates - """ - # 1. initialize collection with data - nq = 10 - collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params - req_list = [] - weights = [1] - vectors = cf.gen_vectors(nq, default_dim, vector_data_type=DataType.FLOAT_VECTOR) - # 4. get hybrid search req list - for i in range(len(vector_name_list)): - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > {value_0}", - "expr_params": {"value_0": 0} - } - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 5. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "ids": insert_ids, "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip(reason="issue 32288") - @pytest.mark.parametrize("nq", [0, 16385]) - def test_hybrid_search_normal_over_max_nq(self, nq): - """ - target: test hybrid search normal case - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w = self.init_collection_general(prefix, True)[0] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params - req_list = [] - weights = [1] - vectors = cf.gen_vectors(nq, default_dim, vector_data_type=DataType.FLOAT_VECTOR) - # 4. get hybrid search req list - for i in range(len(vector_name_list)): - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 5. hybrid search - err_msg = "nq (number of search vector per search request) should be in range [1, 16384]" - collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.err_res, - check_items={"err_code": 65535, - "err_msg": err_msg}) - - @pytest.mark.tags(CaseLabel.L1) - def test_hybrid_search_no_limit(self): - """ - target: test hybrid search with no limit - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - multiple_dim_array = [default_dim, default_dim] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params - vectors = cf.gen_vectors(nq, default_dim, vector_data_type=DataType.FLOAT_VECTOR) - - # get hybrid search req list - search_param = { - "data": vectors, - "anns_field": vector_name_list[0], - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_WeightedRanker_empty_reqs(self, primary_field): - """ - target: test hybrid search normal case - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, primary_field=primary_field, - multiple_dim_array=[default_dim, default_dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. hybrid search with empty reqs - collection_w.hybrid_search([], WeightedRanker(), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 0}) - - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip(reason="issue 29839") - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_as_search(self, nq, primary_field, is_flush): - """ - target: test hybrid search to search as the original search interface - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK), and the result should be equal to search - """ - # 1. initialize collection with data - dim = 3 - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_flush=is_flush, - primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=[dim, dim])[0:5] - - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - for search_field in vector_name_list: - # 2. prepare search params - req_list = [] - search_param = { - "data": vectors, - "anns_field": search_field, - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 3. hybrid search - hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(1), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - search_res = collection_w.search(vectors[:nq], search_field, - default_search_params, default_limit, - default_search_exp, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - # 4. the effect of hybrid search to one field should equal to search - log.info("The distance list is:\n") - for i in range(nq): - log.info(hybrid_res[0].distances) - log.info(search_res[0].distances) - assert hybrid_res[i].ids == search_res[i].ids - - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_different_metric_type(self, nq, primary_field, is_flush, metric_type): + def test_hybrid_search_different_metric_type(self, primary_field, is_flush, metric_type): """ target: test hybrid search for fields with different metric type method: create connection, collection, insert and search @@ -440,6 +1108,7 @@ class TestCollectionHybridSearchValid(TestcaseBase): """ # 1. initialize collection with data dim = 128 + nq = 3 collection_w, _, _, insert_ids, time_stamp = \ self.init_collection_general(prefix, True, dim=dim, is_flush=is_flush, is_index=False, primary_field=primary_field, @@ -472,7 +1141,7 @@ class TestCollectionHybridSearchValid(TestcaseBase): @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_different_metric_type_each_field(self, nq, primary_field, is_flush, metric_type): + def test_hybrid_search_different_metric_type_each_field(self, primary_field, is_flush, metric_type): """ target: test hybrid search for fields with different metric type method: create connection, collection, insert and search @@ -480,6 +1149,7 @@ class TestCollectionHybridSearchValid(TestcaseBase): """ # 1. initialize collection with data dim = 91 + nq = 4 collection_w, _, _, insert_ids, time_stamp = \ self.init_collection_general(prefix, True, dim=dim, is_flush=is_flush, is_index=False, primary_field=primary_field, @@ -528,419 +1198,6 @@ class TestCollectionHybridSearchValid(TestcaseBase): "limit": default_limit, "pk_name": ct.default_int64_field_name})[0] - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - @pytest.mark.skip(reason="issue 29923") - def test_hybrid_search_different_dim(self, nq, primary_field, metric_type): - """ - target: test hybrid search for fields with different dim - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - default_limit = 100 - # 1. initialize collection with data - dim = 121 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)], - "anns_field": vector_name_list[i], - "param": {"metric_type": metric_type, "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - hybrid_search_0 = collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - hybrid_search_1 = collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - for i in range(nq): - assert hybrid_search_0[i].ids == hybrid_search_1[i].ids - assert hybrid_search_0[i].distances == hybrid_search_1[i].distances - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/36273") - def test_hybrid_search_overall_limit_larger_sum_each_limit(self, nq, primary_field, metric_type): - - """ - target: test hybrid search: overall limit which is larger than sum of each limit - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 200 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - id_list_nq = [] - vectors = [] - default_search_params = {"metric_type": metric_type, "offset": 0} - for i in range(len(vector_name_list)): - vectors.append([]) - for i in range(nq): - id_list_nq.append([]) - for k in range(nq): - for i in range(len(vector_name_list)): - vectors_search = [random.random() for _ in range(multiple_dim_array[i])] - vectors[i].append(vectors_search) - # 4. search for the comparision for hybrid search - for i in range(len(vector_name_list)): - search_res = collection_w.search(vectors[i], vector_name_list[i], - default_search_params, default_limit, - default_search_exp, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - for k in range(nq): - id_list_nq[k].extend(search_res[k].ids) - # 5. prepare hybrid search params - for i in range(len(vector_name_list)): - search_param = { - "data": vectors[i], - "anns_field": vector_name_list[i], - "param": default_search_params, - "limit": default_limit, - "expr": default_search_exp} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 6. hybrid search - hybrid_search = \ - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit * len(req_list) + 1)[0] - assert len(hybrid_search) == nq - for i in range(nq): - assert len(hybrid_search[i].ids) == len(list(set(id_list_nq[i]))) - assert set(hybrid_search[i].ids) == set(id_list_nq[i]) - - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_overall_different_limit(self, primary_field, metric_type): - """ - target: test hybrid search with different limit params - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 100 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)], - "anns_field": vector_name_list[i], - "param": {"metric_type": metric_type, "offset": 0}, - "limit": default_limit - i, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/36273") - def test_hybrid_search_min_limit(self, primary_field, metric_type): - """ - target: test hybrid search with minimum limit params - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 99 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - id_list = [] - for i in range(len(vector_name_list)): - vectors = [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(1)] - search_params = {"metric_type": metric_type, "offset": 0} - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": search_params, - "limit": min_dim, - "expr": default_search_exp} - req = AnnSearchRequest(**search_param) - req_list.append(req) - search_res = collection_w.search(vectors[:1], vector_name_list[i], - search_params, min_dim, - default_search_exp, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": min_dim, - "pk_name": ct.default_int64_field_name})[0] - id_list.extend(search_res[0].ids) - # 4. hybrid search - hybrid_search = collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit)[0] - assert len(hybrid_search) == 1 - assert len(hybrid_search[0].ids) == len(list(set(id_list))) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_max_limit(self, primary_field, metric_type): - """ - target: test hybrid search with maximum limit params - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 66 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)], - "anns_field": vector_name_list[i], - "param": {"metric_type": metric_type}, - "limit": max_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_max_min_limit(self, primary_field, metric_type): - """ - target: test hybrid search with maximum and minimum limit params - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 66 - multiple_dim_array = [dim + dim, dim - 10] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - limit = max_limit - if i == 1: - limit = 1 - search_param = { - "data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)], - "anns_field": vector_name_list[i], - "param": {"metric_type": metric_type}, - "limit": limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_same_anns_field(self, primary_field, metric_type): - """ - target: test hybrid search: multiple search on same anns field - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 55 - multiple_dim_array = [dim, dim] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)], - "anns_field": vector_name_list[0], - "param": {"metric_type": metric_type, "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_different_offset_single_field(self, primary_field, is_flush, metric_type): - """ - target: test hybrid search for fields with different offset - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 100 - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, auto_id=False, dim=dim, is_flush=is_flush, is_index=False, - primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=[dim, dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": metric_type} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.load() - # 3. prepare search params - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": [[random.random() for _ in range(dim)] for _ in range(nq)], - "anns_field": vector_name_list[i], - "param": {"metric_type": metric_type, "offset": i}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 1), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_max_reqs_num(self, primary_field): - """ - target: test hybrid search with maximum reqs number - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - dim = 128 - multiple_dim_array = [dim, dim] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=dim, is_index=False, primary_field=primary_field, - enable_dynamic_field=False, multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "COSINE"} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - reqs_max_num = max_hybrid_search_req_num - # 3. prepare search params - req_list = [] - for i in range(reqs_max_num): - search_param = { - "data": [[random.random() for _ in range(dim)] for _ in range(1)], - "anns_field": default_search_field, - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - weights = [random.random() for _ in range(len(req_list))] - log.info(weights) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) def test_hybrid_search_WeightedRanker_different_parameters(self, primary_field, is_flush, metric_type): @@ -981,94 +1238,6 @@ class TestCollectionHybridSearchValid(TestcaseBase): "limit": default_limit, "pk_name": ct.default_int64_field_name}) - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.skip("issue: #29840") - def test_hybrid_search_invalid_WeightedRanker_params(self): - """ - target: test hybrid search with invalid params type to WeightedRanker - method: create connection, collection, insert and search - expected: raise exception - """ - # 1. initialize collection with data - multiple_dim_array = [default_dim, default_dim] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=default_dim, is_index=False, - multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "COSINE"} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - reqs_num = 2 - # 3. prepare search params - req_list = [] - for i in range(reqs_num): - search_param = { - "data": [[random.random() for _ in range(default_dim)] for _ in range(1)], - "anns_field": default_search_field, - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search with list in WeightedRanker - collection_w.hybrid_search(req_list, WeightedRanker([0.9, 0.1]), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - # 5. hybrid search with two-dim list in WeightedRanker - weights = [[random.random() for _ in range(1)] for _ in range(len(req_list))] - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name}) - - @pytest.mark.tags(CaseLabel.L2) - def test_hybrid_search_over_maximum_reqs_num(self): - """ - target: test hybrid search over maximum reqs number - method: create connection, collection, insert and search - expected: raise exception - """ - # 1. initialize collection with data - multiple_dim_array = [default_dim, default_dim] - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=default_dim, is_index=False, - multiple_dim_array=multiple_dim_array)[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "COSINE"} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, flat_index) - collection_w.create_index(ct.default_float_vec_field_name, flat_index) - collection_w.load() - reqs_max_num = max_hybrid_search_req_num + 1 - # 3. prepare search params - req_list = [] - for i in range(reqs_max_num): - search_param = { - "data": [[random.random() for _ in range(default_dim)] for _ in range(1)], - "anns_field": default_search_field, - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - weights = [random.random() for _ in range(len(req_list))] - log.info(weights) - # 4. hybrid search - collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, - check_task=CheckTasks.err_res, - check_items={"err_code": 65535, - "err_msg": 'maximum of ann search requests is 1024'}) - @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) def test_hybrid_search_with_range_search(self, primary_field): @@ -1112,74 +1281,6 @@ class TestCollectionHybridSearchValid(TestcaseBase): "limit": default_limit, "pk_name": ct.default_int64_field_name}) - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_RRFRanker_default_parameter(self, primary_field): - """ - target: test hybrid search with default value to RRFRanker - method: create connection, collection, insert and search. - Note: here the result check is through comparing the score, the ids could not be compared - because the high probability of the same score, then the id is not fixed in the range of - the same score - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=default_dim, primary_field=primary_field, - multiple_dim_array=[default_dim, default_dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params for each vector field - req_list = [] - search_res_dict_array = [] - for i in range(len(vector_name_list)): - vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)] - search_res_dict = {} - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE", "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # search for get the base line of hybrid_search - search_res = collection_w.search(vectors[:1], vector_name_list[i], - default_search_params, default_limit, - default_search_exp, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - ids = search_res[0].ids - for j in range(len(ids)): - search_res_dict[ids[j]] = 1 / (j + 60 + 1) - search_res_dict_array.append(search_res_dict) - # 4. calculate hybrid search base line for RRFRanker - ids_answer, score_answer = cf.get_hybrid_search_base_results_rrf(search_res_dict_array) - # 5. hybrid search - hybrid_search_0 = collection_w.hybrid_search(req_list, RRFRanker(), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - # 6. compare results through the re-calculated distances - for i in range(len(score_answer[:default_limit])): - assert score_answer[i] - hybrid_search_0[0].distances[i] < hybrid_search_epsilon - # 7. run hybrid search with the same parameters twice, and compare the results - hybrid_search_1 = collection_w.hybrid_search(req_list, RRFRanker(), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - - assert hybrid_search_0[0].ids == hybrid_search_1[0].ids - assert hybrid_search_0[0].distances == hybrid_search_1[0].distances - @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("k", [1, 60, 1000, 16383]) @pytest.mark.parametrize("offset", [0, 1, 5]) @@ -1242,148 +1343,6 @@ class TestCollectionHybridSearchValid(TestcaseBase): for i in range(len(score_answer[:default_limit])): assert score_answer[i] - hybrid_res[0].distances[i] < hybrid_search_epsilon - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("offset", [0, 1, 5]) - @pytest.mark.parametrize("rerank", [RRFRanker(), WeightedRanker(0.1, 0.9, 1)]) - @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) - def test_hybrid_search_offset_inside_outside_params(self, primary_field, offset, rerank): - """ - target: test hybrid search with offset inside and outside params - method: create connection, collection, insert and search. - Note: here the result check is through comparing the score, the ids could not be compared - because the high probability of the same score, then the id is not fixed in the range of - the same score - expected: hybrid search successfully with limit(topK), and the result should be the same - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, primary_field=primary_field, - multiple_dim_array=[default_dim, default_dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - req_list = [] - vectors_list = [] - # 3. generate vectors - for i in range(len(vector_name_list)): - vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)] - vectors_list.append(vectors) - # 4. prepare search params for each vector field - for i in range(len(vector_name_list)): - search_param = { - "data": vectors_list[i], - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE", "offset": offset}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search with offset inside the params - hybrid_res_inside = collection_w.hybrid_search(req_list, rerank, default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - # 5. hybrid search with offset parameter - req_list = [] - for i in range(len(vector_name_list)): - search_param = { - "data": vectors_list[i], - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE"}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - hybrid_res = collection_w.hybrid_search(req_list, rerank, default_limit - offset, - offset=offset, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit - offset, - "pk_name": ct.default_int64_field_name})[0] - - assert hybrid_res_inside[0].distances[offset:] == hybrid_res[0].distances - - @pytest.mark.tags(CaseLabel.L2) - def test_hybrid_search_RRFRanker_empty_reqs(self): - """ - target: test hybrid search normal case - method: create connection, collection, insert and search - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, multiple_dim_array=[default_dim, default_dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. hybrid search with empty reqs - collection_w.hybrid_search([], RRFRanker(), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 0}) - - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("k", [0, 16385]) - @pytest.mark.skip(reason="issue #29867") - def test_hybrid_search_RRFRanker_k_out_of_range(self, k): - """ - target: test hybrid search with default value to RRFRanker - method: create connection, collection, insert and search. - Note: here the result check is through comparing the score, the ids could not be compared - because the high probability of the same score, then the id is not fixed in the range of - the same score - expected: hybrid search successfully with limit(topK) - """ - # 1. initialize collection with data - collection_w, _, _, insert_ids, time_stamp = \ - self.init_collection_general(prefix, True, dim=default_dim, - multiple_dim_array=[default_dim, default_dim])[0:5] - # 2. extract vector field name - vector_name_list = cf.extract_vector_field_name_list(collection_w) - vector_name_list.append(ct.default_float_vec_field_name) - # 3. prepare search params for each vector field - req_list = [] - search_res_dict_array = [] - for i in range(len(vector_name_list)): - vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)] - search_res_dict = {} - search_param = { - "data": vectors, - "anns_field": vector_name_list[i], - "param": {"metric_type": "COSINE", "offset": 0}, - "limit": default_limit, - "expr": "int64 > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # search for get the base line of hybrid_search - search_res = collection_w.search(vectors[:1], vector_name_list[i], - default_search_params, default_limit, - default_search_exp, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - ids = search_res[0].ids - for j in range(len(ids)): - search_res_dict[ids[j]] = 1 / (j + k + 1) - search_res_dict_array.append(search_res_dict) - # 4. calculate hybrid search base line for RRFRanker - ids_answer, score_answer = cf.get_hybrid_search_base_results_rrf(search_res_dict_array) - # 5. hybrid search - hybrid_res = collection_w.hybrid_search(req_list, RRFRanker(k), default_limit, - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, - "ids": insert_ids, - "limit": default_limit, - "pk_name": ct.default_int64_field_name})[0] - # 6. compare results through the re-calculated distances - for i in range(len(score_answer[:default_limit])): - delta = math.fabs(score_answer[i] - hybrid_res[0].distances[i]) - assert delta < hybrid_search_epsilon - @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("limit", [1, 100, 16384]) @pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name]) @@ -1452,7 +1411,8 @@ class TestCollectionHybridSearchValid(TestcaseBase): for i1 in range(len(score_answer)): log.info("answer id: %d, distance: %f" % (ids_answer[i1], score_answer[i1])) for i2 in range(len(hybrid_res[0].ids)): - log.info("hybrid search res id: %d, distance: %f" % (hybrid_res[0].ids[i2], hybrid_res[0].distances[i2])) + log.info( + "hybrid search res id: %d, distance: %f" % (hybrid_res[0].ids[i2], hybrid_res[0].distances[i2])) assert delta < hybrid_search_epsilon @pytest.mark.tags(CaseLabel.L1) diff --git a/tests/python_client/milvus_client_v2/test_milvus_client_search_pagination.py b/tests/python_client/milvus_client_v2/test_milvus_client_search_pagination.py index 353bc6a2f0..5c8890071b 100644 --- a/tests/python_client/milvus_client_v2/test_milvus_client_search_pagination.py +++ b/tests/python_client/milvus_client_v2/test_milvus_client_search_pagination.py @@ -1,19 +1,16 @@ import logging -import numpy as np -from common.constants import * + from utils.util_pymilvus import * from common.common_type import CaseLabel, CheckTasks from common import common_type as ct from common import common_func as cf from utils.util_log import test_log as log from base.client_v2_base import TestMilvusClientV2Base -from base.client_base import TestcaseBase import random import pytest import pandas as pd from faker import Faker -import inspect Faker.seed(19530) fake_en = Faker("en_US") diff --git a/tests/python_client/testcases/async_milvus_client/test_e2e_async.py b/tests/python_client/testcases/async_milvus_client/test_e2e_async.py index d6d65f7be9..1ca8e2f44e 100644 --- a/tests/python_client/testcases/async_milvus_client/test_e2e_async.py +++ b/tests/python_client/testcases/async_milvus_client/test_e2e_async.py @@ -218,7 +218,7 @@ class TestAsyncMilvusClient(TestMilvusClientV2Base): await asyncio.gather(*tasks) @pytest.mark.tags(CaseLabel.L0) - async def test_async_client_with_schema(self, schema): + async def test_async_client_with_schema(self): # init async client pk_field_name = "id" self.init_async_milvus_client() @@ -390,7 +390,7 @@ class TestAsyncMilvusClient(TestMilvusClientV2Base): await self.async_milvus_client_wrap.create_database(db_name) await self.async_milvus_client_wrap.close() uri = cf.param_info.param_uri or f"http://{cf.param_info.param_host}:{cf.param_info.param_port}" - self.async_milvus_client_wrap.init_async_client(uri, db_name=db_name) + self.async_milvus_client_wrap.init_async_client(uri, token=cf.param_info.param_token, db_name=db_name) # create collection c_name = cf.gen_unique_str(prefix) @@ -450,7 +450,7 @@ class TestAsyncMilvusClient(TestMilvusClientV2Base): async def test_async_client_close(self): # init async client uri = cf.param_info.param_uri or f"http://{cf.param_info.param_host}:{cf.param_info.param_port}" - self.async_milvus_client_wrap.init_async_client(uri) + self.async_milvus_client_wrap.init_async_client(uri, token=cf.param_info.param_token) # create collection c_name = cf.gen_unique_str(prefix)