diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 27f04bb86f..8dfa3c74b6 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -32,7 +32,7 @@ from datetime import timezone from dateutil import parser import pytz -from pymilvus import CollectionSchema, DataType, FunctionType, Function, MilvusException, MilvusClient +from pymilvus import CollectionSchema, FieldSchema, DataType, FunctionType, Function, MilvusException, MilvusClient from bm25s.tokenization import Tokenizer @@ -632,7 +632,8 @@ def gen_digits_by_length(length=8): return "".join(random.choice(string.digits) for _ in range(length)) -def gen_scalar_field(field_type, name=None, description=ct.default_desc, is_primary=False, **kwargs): +def gen_scalar_field(field_type, name=None, description=ct.default_desc, is_primary=False, + nullable=False, skip_wrapper=False, **kwargs): """ Generate a field schema based on the field type. @@ -641,6 +642,9 @@ def gen_scalar_field(field_type, name=None, description=ct.default_desc, is_prim name: Field name (uses default if None) description: Field description is_primary: Whether this is a primary field + nullable: Whether this field is nullable + skip_wrapper: whether to call FieldSchemaWrapper, in gen_row_data case, + it logs too much if calling the wrapper **kwargs: Additional parameters like max_length, max_capacity, etc. Returns: @@ -658,15 +662,29 @@ def gen_scalar_field(field_type, name=None, description=ct.default_desc, is_prim kwargs['element_type'] = DataType.INT64 if 'max_capacity' not in kwargs: kwargs['max_capacity'] = ct.default_max_capacity - - field, _ = ApiFieldSchemaWrapper().init_field_schema( - name=name, - dtype=field_type, - description=description, - is_primary=is_primary, - **kwargs - ) - return field + if is_primary is True: + nullable = False + + if skip_wrapper is True: + field = FieldSchema( + name=name, + dtype=field_type, + description=description, + is_primary=is_primary, + nullable=nullable, + **kwargs + ) + return field + else: + field, _ = ApiFieldSchemaWrapper().init_field_schema( + name=name, + dtype=field_type, + description=description, + is_primary=is_primary, + nullable=nullable, + **kwargs + ) + return field # Convenience functions for backward compatibility @@ -1825,7 +1843,8 @@ def convert_orm_schema_to_dict_schema(orm_schema): return schema_dict -def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=False, skip_field_names=[], desired_field_names=[]): +def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=False, + skip_field_names=[], desired_field_names=[], desired_dynamic_field_names=[]): """ Generates row data based on the given schema. @@ -1839,6 +1858,7 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal random_pk (bool, optional): Whether to generate random primary key values (default: False) skip_field_names(list, optional): whether to skip some field to gen data manually (default: []) desired_field_names(list, optional): only generate data for specified field names (default: []) + desired_dynamic_field_names(list, optional): generate additional data with random types for specified dynamic fields (default: []) Returns: list[dict]: List of dictionaries where each dictionary represents a row, @@ -1862,6 +1882,7 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal schema = convert_orm_schema_to_dict_schema(schema) # Now schema is always a dict after conversion, process it uniformly + enable_dynamic = schema.get('enable_dynamic_field', False) # Get all fields from schema all_fields = schema.get('fields', []) fields = [] @@ -1875,9 +1896,10 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal # Get struct_fields from schema struct_fields = schema.get('struct_fields', []) - log.debug(f"[gen_row_data_by_schema] struct_fields from schema: {len(struct_fields)} items") + # log.debug(f"[gen_row_data_by_schema] struct_fields from schema: {len(struct_fields)} items") if struct_fields: - log.debug(f"[gen_row_data_by_schema] First struct_field: {struct_fields[0]}") + pass + # log.debug(f"[gen_row_data_by_schema] First struct_field: {struct_fields[0]}") # If struct_fields is not present, extract struct array fields from fields list # This happens when using client.describe_collection() @@ -1943,10 +1965,18 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal field_name = struct_field.get('name', None) struct_data = gen_struct_array_data(struct_field, start=start, random_pk=random_pk) tmp[field_name] = struct_data + + # generate additional data for dynamic fields + if enable_dynamic: + for name in desired_dynamic_field_names: + data_types = [DataType.JSON, DataType.INT64, DataType.FLOAT, DataType.VARCHAR, DataType.BOOL, DataType.ARRAY] + data_type = data_types[random.randint(0, len(data_types) - 1)] + dynamic_field = gen_scalar_field(data_type, nullable=True, skip_wrapper=True) + tmp[name] = gen_data_by_collection_field(dynamic_field) data.append(tmp) - log.debug(f"[gen_row_data_by_schema] Generated {len(data)} rows, first row keys: {list(data[0].keys()) if data else []}") + # log.debug(f"[gen_row_data_by_schema] Generated {len(data)} rows, first row keys: {list(data[0].keys()) if data else []}") return data @@ -3846,14 +3876,17 @@ def extract_vector_field_name_list(collection_w): return vector_name_list -def get_field_dtype_by_field_name(collection_w, field_name): +def get_field_dtype_by_field_name(schema, field_name): """ get the vector field data type by field name collection_w : the collection object to be extracted return: the field data type of the field name """ - schema_dict = collection_w.schema.to_dict() - fields = schema_dict.get('fields') + # Convert ORM schema to dict schema for unified processing + if not isinstance(schema, dict): + schema = convert_orm_schema_to_dict_schema(schema) + + fields = schema.get('fields') for field in fields: if field['name'] == field_name: return field['type'] diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 4685a99d5f..4a80aeaf36 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -90,7 +90,9 @@ all_scalar_data_types = [ DataType.DOUBLE, DataType.VARCHAR, DataType.ARRAY, - DataType.JSON + DataType.JSON, + DataType.GEOMETRY, + DataType.TIMESTAMPTZ ] default_field_name_map = { @@ -326,6 +328,9 @@ sparse_metrics = ["IP", "BM25"] # all_scalar_data_types = ['int8', 'int16', 'int32', 'int64', 'float', 'double', 'bool', 'varchar'] +varchar_supported_index_types = ["STL_SORT", "TRIE", "INVERTED", "AUTOINDEX", ""] +numeric_supported_index_types = ["STL_SORT", "INVERTED", "AUTOINDEX", ""] + default_flat_index = {"index_type": "FLAT", "params": {}, "metric_type": default_L0_metric} default_bin_flat_index = {"index_type": "BIN_FLAT", "params": {}, "metric_type": "JACCARD"} default_sparse_inverted_index = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP", @@ -372,6 +377,13 @@ all_expr_fields = [default_int8_field_name, default_int16_field_name, default_bool_array_field_name, default_float_array_field_name, default_double_array_field_name, default_string_array_field_name] +not_supported_json_cast_types = [DataType.INT8.name, DataType.INT16.name, DataType.INT32.name, + DataType.INT64.name, DataType.FLOAT.name, + DataType.ARRAY.name, DataType.FLOAT_VECTOR.name, + DataType.FLOAT16_VECTOR.name, DataType.BFLOAT16_VECTOR.name, + DataType.BINARY_VECTOR.name, + DataType.SPARSE_FLOAT_VECTOR.name, DataType.INT8_VECTOR.name] + class CheckTasks: """ The name of the method used to check the result """ check_nothing = "check_nothing" diff --git a/tests/python_client/milvus_client/test_milvus_client_index.py b/tests/python_client/milvus_client/test_milvus_client_index.py index acb0bcbe76..de1a05e6d1 100644 --- a/tests/python_client/milvus_client/test_milvus_client_index.py +++ b/tests/python_client/milvus_client/test_milvus_client_index.py @@ -234,8 +234,9 @@ class TestMilvusClientIndexInvalid(TestMilvusClientV2Base): index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name="embeddings", index_type=not_supported_index, metric_type="L2") # 4. create another index - error = {ct.err_code: 1100, ct.err_msg: f"data type Int8Vector can't build with this index {not_supported_index}: " - f"invalid parameter[expected=valid index params][actual=invalid index params]"} + error = {ct.err_code: 1100, + ct.err_msg: f"data type Int8Vector can't build with this index {not_supported_index}: " + f"invalid parameter[expected=valid index params][actual=invalid index params]"} self.create_index(client, collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) self.drop_collection(client, collection_name) @@ -277,39 +278,25 @@ class TestMilvusClientIndexInvalid(TestMilvusClientV2Base): class TestMilvusClientIndexValid(TestMilvusClientV2Base): """ Test case of index interface """ - @pytest.fixture(scope="function", params=[False, True]) - def auto_id(self, request): - yield request.param - @pytest.fixture(scope="function", params=["COSINE", "L2", "IP"]) def metric_type(self, request): yield request.param - @pytest.fixture(scope="function", params=["TRIE", "STL_SORT", "INVERTED", "AUTOINDEX"]) - def scalar_index(self, request): - yield request.param - - @pytest.fixture(scope="function", params=["TRIE", "INVERTED", "AUTOINDEX", ""]) - def varchar_index(self, request): - yield request.param - - @pytest.fixture(scope="function", params=["STL_SORT", "INVERTED", "AUTOINDEX", ""]) - def numeric_index(self, request): - yield request.param - """ ****************************************************************** # The following are valid base cases ****************************************************************** """ + @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("index", ct.all_index_types[:8]) - def test_milvus_client_index_with_params(self, index, metric_type): + def test_milvus_client_index_with_params(self, index): """ target: test index with user defined params method: create connection, collection, index, insert and search expected: index/search/query successfully """ + metric_type = "L2" client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection @@ -351,12 +338,13 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("index", ct.all_index_types[:8]) - def test_milvus_client_index_after_insert(self, index, metric_type): + def test_milvus_client_index_after_insert(self, index): """ target: test index after insert method: create connection, collection, insert, index and search expected: index/search/query successfully """ + metric_type = "COSINE" client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection @@ -394,25 +382,27 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): "pk_name": default_primary_key_field_name}) self.drop_collection(client, collection_name) - @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("add_field", [True, False]) - def test_milvus_client_index_auto_index(self, numeric_index, varchar_index, metric_type, add_field): + def test_milvus_client_scalar_auto_index(self, add_field): """ target: test index with autoindex on both scalar and vector field method: create connection, collection, insert and search expected: index/search/query successfully """ + metric_type = "COSINE" + numeric_fields = [ct.default_int32_field_name, ct.default_int16_field_name, + ct.default_int8_field_name, default_float_field_name, + ct.default_double_field_name, ct.default_int64_field_name] client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection schema = self.create_schema(client)[0] schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True) - schema.add_field(ct.default_int32_field_name, DataType.INT32) - schema.add_field(ct.default_int16_field_name, DataType.INT16) - schema.add_field(ct.default_int8_field_name, DataType.INT8) - schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=64) - schema.add_field(default_float_field_name, DataType.FLOAT) - schema.add_field(ct.default_double_field_name, DataType.DOUBLE) + for field_name in numeric_fields: + schema.add_field(field_name, DataType.INT32, nullable=True) + for index in ct.varchar_supported_index_types: + schema.add_field(f"{default_string_field_name}_{index}", DataType.VARCHAR, max_length=64, nullable=True) schema.add_field(ct.default_bool_field_name, DataType.BOOL) schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) self.create_collection(client, collection_name, schema=schema, consistency_level="Strong") @@ -428,29 +418,46 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): # 2. prepare index params index = "AUTOINDEX" index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=default_primary_key_field_name, + index_type=ct.numeric_supported_index_types[0], metric_type=metric_type) index_params.add_index(field_name=default_vector_field_name, index_type=index, metric_type=metric_type) - index_params.add_index(field_name=ct.default_int32_field_name, index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name=ct.default_int16_field_name, index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name=ct.default_int8_field_name, index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name=default_float_field_name, index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name=ct.default_double_field_name, index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name=ct.default_bool_field_name, index_type="", metric_type=metric_type) - index_params.add_index(field_name=default_string_field_name, index_type=varchar_index, metric_type=metric_type) - index_params.add_index(field_name=default_primary_key_field_name, index_type=numeric_index, metric_type=metric_type) + index_params.add_index(field_name=ct.default_bool_field_name, + index_type="", metric_type=metric_type) + if len(numeric_fields) >= len(ct.numeric_supported_index_types): + k = 0 + for i in range(len(numeric_fields)): + if k >= len(ct.numeric_supported_index_types): + k = 0 + index_params.add_index(field_name=numeric_fields[i], + index_type=ct.numeric_supported_index_types[k], metric_type=metric_type) + k += 1 + else: + k = 0 + for i in range(len(ct.numeric_supported_index_types)): + if k >= len(numeric_fields): + k = 0 + index_params.add_index(field_name=numeric_fields[k], + index_type=ct.numeric_supported_index_types[i], metric_type=metric_type) + k += 1 + + for index in ct.varchar_supported_index_types: + index_params.add_index(field_name=f"{default_string_field_name}_{index}", + index_type=index, metric_type=metric_type) + if add_field: - index_params.add_index(field_name="field_int", index_type=numeric_index, metric_type=metric_type) - index_params.add_index(field_name="field_varchar", index_type=varchar_index, metric_type=metric_type) + index_params.add_index(field_name="field_int", + index_type=ct.numeric_supported_index_types[0], metric_type=metric_type) + index_params.add_index(field_name="field_varchar", + index_type=ct.varchar_supported_index_types[0], metric_type=metric_type) # 3. create index self.create_index(client, collection_name, index_params) # 4. drop index self.drop_index(client, collection_name, default_vector_field_name) - self.drop_index(client, collection_name, ct.default_int32_field_name) - self.drop_index(client, collection_name, ct.default_int16_field_name) - self.drop_index(client, collection_name, ct.default_int8_field_name) - self.drop_index(client, collection_name, default_float_field_name) - self.drop_index(client, collection_name, ct.default_double_field_name) + for field_name in numeric_fields: + self.drop_index(client, collection_name, field_name) + for index in ct.varchar_supported_index_types: + self.drop_index(client, collection_name, f"{default_string_field_name}_{index}") self.drop_index(client, collection_name, ct.default_bool_field_name) - self.drop_index(client, collection_name, default_string_field_name) self.drop_index(client, collection_name, default_primary_key_field_name) if add_field: self.drop_index(client, collection_name, "field_int") @@ -458,28 +465,38 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): # 5. create index self.create_index(client, collection_name, index_params) # 6. insert - rng = np.random.default_rng(seed=19530) - rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), - ct.default_int32_field_name: np.int32(i), ct.default_int16_field_name: np.int16(i), - ct.default_int8_field_name: np.int8(i), default_float_field_name: i * 1.0, - ct.default_double_field_name: np.double(i), ct.default_bool_field_name: np.bool_(i), - default_string_field_name: str(i), - **({"field_int": 10} if add_field else {}), - **({"field_varchar": "default"} if add_field else {}) - } for i in range(default_nb)] + collection_info = self.describe_collection(client, collection_name)[0] + rows = cf.gen_row_data_by_schema(nb=2000, schema=collection_info, start=0) self.insert(client, collection_name, rows) # 7. load collection self.load_collection(client, collection_name) # 8. search - vectors_to_search = rng.random((1, default_dim)) + vectors_to_search = cf.gen_vectors(nb=1, dim=default_dim) insert_ids = [i for i in range(default_nb)] - self.search(client, collection_name, vectors_to_search, - check_task=CheckTasks.check_search_results, - check_items={"enable_milvus_client_api": True, - "nq": len(vectors_to_search), - "ids": insert_ids, - "limit": default_limit, - "pk_name": default_primary_key_field_name}) + filter_fields = [] + filter_fields.extend(numeric_fields) + if add_field: + filter_fields.extend(["field_int", "field_varchar"]) + for field_name in filter_fields: + self.search(client, collection_name, vectors_to_search, + limit=default_limit, + filter=f"{field_name} is null", + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "ids": insert_ids, + "limit": default_limit, + "pk_name": default_primary_key_field_name}) + for index in ct.varchar_supported_index_types: + self.search(client, collection_name, vectors_to_search, + limit=default_limit, + filter=f"{default_string_field_name}_{index} is not null", + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "ids": insert_ids, + "limit": default_limit, + "pk_name": default_primary_key_field_name}) # 9. query self.query(client, collection_name, filter=default_search_exp, check_task=CheckTasks.check_query_results, @@ -489,12 +506,13 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): self.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_scalar_hybrid_index_small_distinct_before_insert(self, metric_type): + def test_milvus_client_scalar_hybrid_index_small_distinct_before_insert(self): """ target: test index with autoindex on int/varchar with small distinct value (<=100) method: create connection, collection, insert and search expected: index/search/query successfully (autoindex is bitmap index indeed) """ + metric_type = "IP" client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection @@ -544,13 +562,14 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): self.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_scalar_hybrid_index_small_to_large_distinct_after_insert(self, metric_type): + def test_milvus_client_scalar_hybrid_index_small_to_large_distinct_after_insert(self): """ target: test index with autoindex on int/varchar with small distinct value (<=100) first and insert to large distinct (2000+) later method: create connection, collection, insert and search expected: index/search/query successfully """ + metric_type = "COSINE" client = self._client() collection_name = cf.gen_unique_str(prefix) # 1. create collection @@ -609,12 +628,12 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), int64_field_name: np.random.randint(0, 99), ct.default_int32_field_name: np.int32(i), ct.default_int16_field_name: np.int16(i), ct.default_int8_field_name: np.int8(i), - default_string_field_name: str(np.random.randint(0, 99))} for i in range(default_nb, 2*default_nb)] + default_string_field_name: str(np.random.randint(0, 99))} for i in range(default_nb, 2 * default_nb)] self.insert(client, collection_name, rows) self.flush(client, collection_name) # 9. search vectors_to_search = rng.random((1, default_dim)) - insert_ids = [i for i in range(2*default_nb)] + insert_ids = [i for i in range(2 * default_nb)] self.search(client, collection_name, vectors_to_search, check_task=CheckTasks.check_search_results, check_items={"enable_milvus_client_api": True, @@ -625,7 +644,7 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): self.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L2) - def test_milvus_client_index_multiple_vectors(self, numeric_index, metric_type): + def test_milvus_client_vector_auto_index(self, metric_type): """ target: test index for multiple vectors method: create connection, collection, index, insert and search @@ -641,6 +660,7 @@ class TestMilvusClientIndexValid(TestMilvusClientV2Base): assert res == [] # 2. prepare index params index = "AUTOINDEX" + numeric_index = "STL_SORT" index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name="vector", index_type=index, metric_type=metric_type) index_params.add_index(field_name="id", index_type=numeric_index, metric_type=metric_type) @@ -736,7 +756,8 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): @pytest.fixture(scope="function", params=[DataType.INT8.name, DataType.INT16.name, DataType.INT32.name, DataType.INT64.name, DataType.FLOAT.name, DataType.ARRAY.name, DataType.FLOAT_VECTOR.name, - DataType.FLOAT16_VECTOR.name, DataType.BFLOAT16_VECTOR.name, DataType.BINARY_VECTOR.name, + DataType.FLOAT16_VECTOR.name, DataType.BFLOAT16_VECTOR.name, + DataType.BINARY_VECTOR.name, DataType.SPARSE_FLOAT_VECTOR.name, DataType.INT8_VECTOR.name]) def not_supported_json_cast_type(self, request): yield request.param @@ -813,7 +834,7 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") index_params.add_index(field_name="my_json", index_type=invalid_index_type, params={"json_cast_type": "double", - "json_path": "my_json['a']['b']"}) + "json_path": "my_json['a']['b']"}) # 3. create index error = {ct.err_code: 1100, ct.err_msg: f"invalid parameter[expected=valid index]" f"[actual=invalid index type: {invalid_index_type}]"} @@ -822,7 +843,8 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_milvus_client_json_path_index_not_support_index_type(self, enable_dynamic_field, not_supported_varchar_scalar_index): + def test_milvus_client_json_path_index_not_support_index_type(self, enable_dynamic_field, + not_supported_varchar_scalar_index): """ target: test json path index with not supported index type method: create json path index with not supported index type @@ -889,16 +911,18 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): # 2. prepare index params with invalid json index type index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") - index_params.add_index(field_name=json_field_name, index_name="json_index", index_type=supported_varchar_scalar_index, - params={"json_cast_type": invalid_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + index_params.add_index(field_name=json_field_name, index_name="json_index", + index_type=supported_varchar_scalar_index, + params={"json_cast_type": invalid_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) # 3. create index error = {ct.err_code: 1100, ct.err_msg: f"index params][actual=invalid index params]"} self.create_index(client, collection_name, index_params, check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_milvus_client_json_path_index_not_supported_json_cast_type(self, enable_dynamic_field, not_supported_json_cast_type, + @pytest.mark.parametrize("enable_dynamic_field", [True]) + def test_milvus_client_json_path_index_not_supported_json_cast_type(self, enable_dynamic_field, supported_varchar_scalar_index): """ target: test json path index with not supported json_cast_type @@ -919,17 +943,20 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): index_params.add_index(default_vector_field_name, metric_type="COSINE") self.create_collection(client, collection_name, default_dim) # 2. prepare index params with invalid json index type - index_params = self.prepare_index_params(client)[0] - index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") - index_params.add_index(field_name=json_field_name, index_name="json_index", index_type=supported_varchar_scalar_index, - params={"json_cast_type": not_supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) - # 3. create index - error = {ct.err_code: 1100, ct.err_msg: f"index params][actual=invalid index params]"} - self.create_index(client, collection_name, index_params, - check_task=CheckTasks.err_res, check_items=error) + for cast_type in ct.not_supported_json_cast_types: + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") + index_params.add_index(field_name=json_field_name, index_name="json_index", + index_type=supported_varchar_scalar_index, + params={"json_cast_type": cast_type, + "json_path": f"{json_field_name}['a']['b']"}) + # 3. create index + error = {ct.err_code: 1100, ct.err_msg: f"index params][actual=invalid index params]"} + self.create_index(client, collection_name, index_params, + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("enable_dynamic_field", [True, False]) + @pytest.mark.parametrize("enable_dynamic_field", [False]) @pytest.mark.parametrize("invalid_json_path", [1, 1.0, '/']) def test_milvus_client_json_path_index_invalid_json_path(self, enable_dynamic_field, invalid_json_path, supported_varchar_scalar_index): @@ -983,7 +1010,7 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): params={"json_cast_type": "double", "json_path": f"{json_field_name}['a']"}) error = {ct.err_code: 65535, ct.err_msg: f"cannot create index on non-exist field: {json_field_name}"} self.create_collection(client, collection_name, schema=schema, index_params=index_params, - check_task = CheckTasks.err_res, check_items = error) + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) @@ -1026,7 +1053,8 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_milvus_client_different_index_name_same_json_path(self, enable_dynamic_field, supported_varchar_scalar_index): + def test_milvus_client_different_index_name_same_json_path(self, enable_dynamic_field, + supported_varchar_scalar_index): """ target: test json path index with different index name but with same json path method: create json path index with different index name but with same json path @@ -1065,7 +1093,8 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_milvus_client_different_json_path_index_same_field_same_index_name(self, enable_dynamic_field, supported_json_cast_type, + def test_milvus_client_different_json_path_index_same_field_same_index_name(self, enable_dynamic_field, + supported_json_cast_type, supported_varchar_scalar_index): """ target: test different json path index with same index name at the same time @@ -1094,8 +1123,10 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): index_name = "json_index" index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") - index_params.add_index(field_name=json_field_name, index_name=index_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + index_params.add_index(field_name=json_field_name, index_name=index_name, + index_type=supported_varchar_scalar_index, + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) index_params.add_index(field_name=json_field_name, index_name=index_name, index_type=supported_varchar_scalar_index, params={"json_cast_type": supported_json_cast_type, @@ -1113,9 +1144,9 @@ class TestMilvusClientJsonPathIndexInvalid(TestMilvusClientV2Base): class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): """ Test case of search interface """ - @pytest.fixture(scope="function", params=["TRIE", "STL_SORT", "BITMAP"]) - def not_supported_varchar_scalar_index(self, request): - yield request.param + # @pytest.fixture(scope="function", params=["TRIE", "STL_SORT", "BITMAP"]) + # def not_supported_varchar_scalar_index(self, request): + # yield request.param @pytest.fixture(scope="function", params=["INVERTED"]) def supported_varchar_scalar_index(self, request): @@ -1154,18 +1185,18 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_params.add_index(default_vector_field_name, metric_type="COSINE") self.create_collection(client, collection_name, schema=schema, index_params=index_params) # 2. insert with different data distribution - vectors = cf.gen_vectors(default_nb+50, default_dim) + vectors = cf.gen_vectors(default_nb + 50, default_dim) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': {"b": i}}} for i in range(default_nb)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: i} for i in - range(default_nb, default_nb+10)] + range(default_nb, default_nb + 10)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {}} for i in - range(default_nb+10, default_nb+20)] + range(default_nb + 10, default_nb + 20)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': [1, 2, 3]}} for i in @@ -1183,7 +1214,8 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, params={"json_cast_type": supported_json_cast_type, @@ -1288,7 +1320,8 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) # 3. create index if enable_dynamic_field: index_name = "$meta/" + json_field_name + '/a/b' @@ -1339,7 +1372,8 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") index_params.add_index(field_name=default_string_field_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{default_string_field_name}['a']['b']"}) + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{default_string_field_name}['a']['b']"}) # 4. create index index_name = default_string_field_name self.create_index(client, collection_name, index_params) @@ -1354,7 +1388,8 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) - def test_milvus_client_different_json_path_index_same_field_different_index_name(self, enable_dynamic_field, supported_json_cast_type, + def test_milvus_client_different_json_path_index_same_field_different_index_name(self, enable_dynamic_field, + supported_json_cast_type, supported_varchar_scalar_index): """ target: test different json path index with different default index name at the same time @@ -1383,8 +1418,10 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_name = "json_index" index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") - index_params.add_index(field_name=json_field_name, index_name=index_name + "1", index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + index_params.add_index(field_name=json_field_name, index_name=index_name + "1", + index_type=supported_varchar_scalar_index, + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) index_params.add_index(field_name=json_field_name, index_name=index_name + "2", index_type=supported_varchar_scalar_index, params={"json_cast_type": supported_json_cast_type, @@ -1452,7 +1489,8 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): # 2. prepare index params index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) self.create_index(client, collection_name, index_params) index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=json_field_name, @@ -1570,7 +1608,7 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("enable_dynamic_field", [True, False]) def test_milvus_client_json_path_index_before_load(self, enable_dynamic_field, supported_json_cast_type, - supported_varchar_scalar_index): + supported_varchar_scalar_index): """ target: test json path index with not supported json_cast_type method: create json path index with not supported json_cast_type @@ -1592,18 +1630,18 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): # 2. release collection self.release_collection(client, collection_name) # 3. insert with different data distribution - vectors = cf.gen_vectors(default_nb+50, default_dim) + vectors = cf.gen_vectors(default_nb + 50, default_dim) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': {"b": i}}} for i in range(default_nb)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: i} for i in - range(default_nb, default_nb+10)] + range(default_nb, default_nb + 10)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {}} for i in - range(default_nb+10, default_nb+20)] + range(default_nb + 10, default_nb + 20)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': [1, 2, 3]}} for i in @@ -1621,8 +1659,10 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): index_name = "json_index" index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") - index_params.add_index(field_name=json_field_name, index_name=index_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + index_params.add_index(field_name=json_field_name, index_name=index_name, + index_type=supported_varchar_scalar_index, + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) index_params.add_index(field_name=json_field_name, index_name=index_name + '1', index_type=supported_varchar_scalar_index, params={"json_cast_type": supported_json_cast_type, @@ -1644,7 +1684,7 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): self.describe_index(client, collection_name, index_name, check_task=CheckTasks.check_describe_index_property, check_items={ - #"json_cast_type": supported_json_cast_type, # issue 40426 + # "json_cast_type": supported_json_cast_type, # issue 40426 "json_path": f"{json_field_name}['a']['b']", "index_type": supported_varchar_scalar_index, "field_name": json_field_name, @@ -1669,7 +1709,7 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): "index_type": supported_varchar_scalar_index, "field_name": json_field_name, "index_name": index_name + '1'}) - self.describe_index(client, collection_name, index_name +'2', + self.describe_index(client, collection_name, index_name + '2', check_task=CheckTasks.check_describe_index_property, check_items={ # "json_cast_type": supported_json_cast_type, # issue 40426 @@ -1718,33 +1758,35 @@ class TestMilvusClientJsonPathIndexValid(TestMilvusClientV2Base): res = self.describe_collection(client, collection_name)[0] assert res.get('enable_dynamic_field', None) is True # 3. insert with different data distribution - vectors = cf.gen_vectors(default_nb+50, default_dim) + vectors = cf.gen_vectors(default_nb + 50, default_dim) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': {"b": i}}} for i in range(default_nb)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], - default_string_field_name: str(i), json_field_name: i} for i in range(default_nb, default_nb+10)] + default_string_field_name: str(i), json_field_name: i} for i in range(default_nb, default_nb + 10)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], - default_string_field_name: str(i), json_field_name: {}} for i in range(default_nb+10, default_nb+20)] + default_string_field_name: str(i), json_field_name: {}} for i in + range(default_nb + 10, default_nb + 20)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': [1, 2, 3]}} - for i in range(default_nb + 20, default_nb + 30)] + for i in range(default_nb + 20, default_nb + 30)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': [{'b': 1}, 2, 3]}} - for i in range(default_nb + 20, default_nb + 30)] + for i in range(default_nb + 20, default_nb + 30)] self.insert(client, collection_name, rows) rows = [{default_primary_key_field_name: i, default_vector_field_name: vectors[i], default_string_field_name: str(i), json_field_name: {'a': [{'b': None}, 2, 3]}} - for i in range(default_nb + 30, default_nb + 40)] + for i in range(default_nb + 30, default_nb + 40)] self.insert(client, collection_name, rows) # 4. prepare index params index_params = self.prepare_index_params(client)[0] index_params.add_index(field_name=default_vector_field_name, index_type="AUTOINDEX", metric_type="COSINE") index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, - params={"json_cast_type": supported_json_cast_type, "json_path": f"{json_field_name}['a']['b']"}) + params={"json_cast_type": supported_json_cast_type, + "json_path": f"{json_field_name}['a']['b']"}) index_params.add_index(field_name=json_field_name, index_type=supported_varchar_scalar_index, params={"json_cast_type": supported_json_cast_type, diff --git a/tests/python_client/milvus_client_v2/test_milvus_client_search_group_by.py b/tests/python_client/milvus_client_v2/test_milvus_client_search_group_by.py index 23e0e0db0e..ab6ce08f0f 100644 --- a/tests/python_client/milvus_client_v2/test_milvus_client_search_group_by.py +++ b/tests/python_client/milvus_client_v2/test_milvus_client_search_group_by.py @@ -1,156 +1,470 @@ +import json import numpy as np -from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_SESSION, CONSISTENCY_EVENTUALLY from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker from pymilvus import ( - FieldSchema, CollectionSchema, DataType, - Collection + FieldSchema, CollectionSchema, DataType ) -from common.constants import * from utils.util_pymilvus import * from common.common_type import CaseLabel, CheckTasks from common import common_type as ct from common import common_func as cf from utils.util_log import test_log as log -from base.client_base import TestcaseBase -import heapq -from time import sleep -from decimal import Decimal, getcontext -import decimal -import multiprocessing -import numbers +from base.client_v2_base import TestMilvusClientV2Base import random -import math -import numpy -import threading import pytest -import pandas as pd -from faker import Faker -Faker.seed(19530) -fake_en = Faker("en_US") -fake_zh = Faker("zh_CN") - -# patch faker to generate text with specific distribution -cf.patch_faker_text(fake_en, cf.en_vocabularies_distribution) -cf.patch_faker_text(fake_zh, cf.zh_vocabularies_distribution) - -pd.set_option("expand_frame_repr", False) - -prefix = "search_collection" -search_num = 10 -max_dim = ct.max_dim -min_dim = ct.min_dim -epsilon = ct.epsilon -hybrid_search_epsilon = 0.01 -gracefulTime = ct.gracefulTime -default_nb = ct.default_nb -default_nb_medium = ct.default_nb_medium -default_nq = ct.default_nq -default_dim = ct.default_dim -default_limit = ct.default_limit -max_limit = ct.max_limit -default_search_exp = "int64 >= 0" -default_search_string_exp = "varchar >= \"0\"" -default_search_mix_exp = "int64 >= 0 && varchar >= \"0\"" -default_invaild_string_exp = "varchar >= 0" -default_json_search_exp = "json_field[\"number\"] >= 0" -perfix_expr = 'varchar like "0%"' -default_search_field = ct.default_float_vec_field_name -default_search_params = ct.default_search_params -default_int64_field_name = ct.default_int64_field_name -default_float_field_name = ct.default_float_field_name -default_bool_field_name = ct.default_bool_field_name -default_string_field_name = ct.default_string_field_name -default_json_field_name = ct.default_json_field_name -default_index_params = ct.default_index -vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] -uid = "test_search" -nq = 1 epsilon = 0.001 -field_name = default_float_vec_field_name -binary_field_name = default_binary_vec_field_name -search_param = {"nprobe": 1} -entity = gen_entities(1, is_normal=True) -entities = gen_entities(default_nb, is_normal=True) -raw_vectors, binary_entities = gen_binary_entities(default_nb) -default_query, _ = gen_search_vectors_params(field_name, entities, default_top_k, nq) -index_name1 = cf.gen_unique_str("float") -index_name2 = cf.gen_unique_str("varhar") -half_nb = ct.default_nb // 2 -max_hybrid_search_req_num = ct.max_hybrid_search_req_num + +dyna_filed_name1 = "dyna_filed_name1" +dyna_filed_name2 = "dyna_filed_name2" +inverted_string_field_name = "varchar_inverted" +indexed_json_field_name = "indexed_json" -class TestSearchGroupBy(TestcaseBase): - """ Test case of search group by """ +@pytest.mark.xdist_group("TestGroupSearch") +class TestGroupSearch(TestMilvusClientV2Base): + def setup_class(self): + super().setup_class(self) + self.collection_name = "TestGroupSearch" + cf.gen_unique_str("_") + self.partition_names = ["partition_1", "partition_2"] + self.primary_field = "int64_pk" + self.float_vector_field_name = ct.default_float_vec_field_name + self.bfloat16_vector_field_name = "bfloat16_vector" + self.sparse_vector_field_name = "sparse_vector" + self.binary_vector_field_name = "binary_vector" + self.vector_fields = [self.float_vector_field_name, self.bfloat16_vector_field_name, + self.sparse_vector_field_name, self.binary_vector_field_name] + self.float_vector_dim = 36 + self.bf16_vector_dim = 35 + self.binary_vector_dim = 32 + self.dims = [self.float_vector_dim, self.bf16_vector_dim, + 128, self.binary_vector_dim] + self.float_vector_metric = "COSINE" + self.bf16_vector_metric = "L2" + self.sparse_vector_metric = "IP" + self.binary_vector_metric = "JACCARD" + self.float_vector_index = "IVF_FLAT" + self.bf16_vector_index = "DISKANN" + self.sparse_vector_index = "SPARSE_INVERTED_INDEX" + self.binary_vector_index = "BIN_IVF_FLAT" + self.index_types = [self.float_vector_index, self.bf16_vector_index, + self.sparse_vector_index, self.binary_vector_index] + self.inverted_string_field = inverted_string_field_name + self.indexed_json_field = indexed_json_field_name + self.enable_dynamic_field = True + self.dyna_filed_name1 = dyna_filed_name1 + self.dyna_filed_name2 = dyna_filed_name2 - @pytest.mark.tags(CaseLabel.L2) - def test_search_max_group_size_and_max_limit(self): + @pytest.fixture(scope="class", autouse=True) + def prepare_collection(self, request): """ - target: test search group by with max group size and max limit - method: 1. create a collection with data - 2. search with group by int32 with max group size and max limit - + Initialize collection before test class runs """ - pass + # Get client connection + client = self._client() - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("group_size", [0, -1]) - @pytest.mark.skip(reason="issue #36146") - def test_search_negative_group_size(self, group_size): - """ - target: test search group by with negative group size - """ - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=True, is_index=True)[0] - search_params = ct.default_search_params - search_vectors = cf.gen_vectors(1, dim=ct.default_dim) - # verify - error = {ct.err_code: 999, ct.err_msg: "group_size must be greater than 1"} - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=10, - group_by_field=ct.default_int64_field_name, - group_size=group_size, - check_task=CheckTasks.err_res, check_items=error) + # Create collection + fields = [] + fields.append(FieldSchema(name=self.primary_field, dtype=DataType.INT64, is_primary=True, auto_id=True)) + fields.append( + FieldSchema(name=self.float_vector_field_name, dtype=DataType.FLOAT_VECTOR, dim=self.float_vector_dim)) - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("metric", ["JACCARD", "HAMMING"]) - def test_search_binary_vec_group_by(self, metric): - """ - target: test search on birany vector does not support group by - method: 1. create a collection with binary vectors - 2. create index with different metric types - 3. search with group by - verified error code and msg - """ - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_binary=True)[0] - _index = {"index_type": "BIN_FLAT", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_binary_vec_field_name, index_params=_index) - # insert with the same values for scalar fields - for _ in range(10): - data = cf.gen_default_binary_dataframe_data(nb=100, auto_id=True)[0] - collection_w.insert(data) + for data_type in ct.all_scalar_data_types: + fields.append(cf.gen_scalar_field(data_type, name=data_type.name, nullable=True)) - collection_w.flush() - collection_w.create_index(ct.default_binary_vec_field_name, index_params=_index) - collection_w.load() + collection_schema = CollectionSchema(fields, enable_dynamic_field=self.enable_dynamic_field) + collection_schema.add_field(self.bfloat16_vector_field_name, DataType.BFLOAT16_VECTOR, dim=self.bf16_vector_dim) + collection_schema.add_field(self.sparse_vector_field_name, DataType.SPARSE_FLOAT_VECTOR) + collection_schema.add_field(self.binary_vector_field_name, DataType.BINARY_VECTOR, dim=self.binary_vector_dim) - search_params = {"metric_type": metric, "params": {"ef": 128}} + collection_schema.add_field(self.inverted_string_field, DataType.VARCHAR, max_length=256, nullable=True) + collection_schema.add_field(self.indexed_json_field, DataType.JSON, nullable=True) + self.create_collection(client, self.collection_name, schema=collection_schema, force_teardown=False) + for partition_name in self.partition_names: + self.create_partition(client, self.collection_name, partition_name=partition_name) + + # Define number of insert iterations + insert_times = 100 + nb = 100 + # Insert data multiple times with non-duplicated primary keys + for j in range(insert_times): + # Group rows by partition based on primary key mod 3 + default_rows = [] + partition1_rows = [] + partition2_rows = [] + + geo_value = f"POINT({random.uniform(-180, 180)} {random.uniform(-90, 90)})" + timestamptz_value = cf.gen_timestamptz_str() + int8_value = random.randint(-128, 127) + int16_value = random.randint(-32768, 32767) + int32_value = random.randint(-2147483648, 2147483647) + for i in range(nb): + row = { + self.float_vector_field_name: cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR)[0], + self.bfloat16_vector_field_name: cf.gen_vectors(1, dim=self.bf16_vector_dim, + vector_data_type=DataType.BFLOAT16_VECTOR)[0], + self.sparse_vector_field_name: cf.gen_sparse_vectors(1, empty_percentage=2)[0], + self.binary_vector_field_name: cf.gen_vectors(1, dim=self.binary_vector_dim, + vector_data_type=DataType.BINARY_VECTOR)[0], + DataType.BOOL.name: bool(i % 2) if random.random() < 0.8 else None, + DataType.INT8.name: int8_value if random.random() < 0.8 else None, + DataType.INT16.name: int16_value if random.random() < 0.8 else None, + DataType.INT32.name: int32_value if random.random() < 0.8 else None, + DataType.INT64.name: i if random.random() < 0.8 else None, + DataType.FLOAT.name: i * 1.0 if random.random() < 0.8 else None, + DataType.DOUBLE.name: i * 1.0 if random.random() < 0.8 else None, + DataType.TIMESTAMPTZ.name: timestamptz_value if random.random() < 0.8 else None, + DataType.VARCHAR.name: f"varchar_{i}" if random.random() < 0.8 else None, + DataType.ARRAY.name: [i, i + 1] if random.random() < 0.8 else None, + DataType.JSON.name: {"number": i, "string": f"string_{i}"} if random.random() < 0.8 else None, + DataType.GEOMETRY.name: geo_value if random.random() < 0.8 else None, + self.inverted_string_field: f"inverted_string_{i}" if random.random() < 0.8 else None, + self.indexed_json_field: {"number": i, "string": f"string_{i}"} if random.random() < 0.8 else None, + self.dyna_filed_name1: f"dyna_value_{i}" if random.random() < 0.8 else None, + self.dyna_filed_name2: {"number": i, "string": f"string_{i}"} if random.random() < 0.8 else None, + } + + # Distribute to partitions based on pk mod 3 + if i % 3 == 0: + default_rows.append(row) + elif i % 3 == 1: + partition1_rows.append(row) + else: + partition2_rows.append(row) + + # Insert into respective partitions + if default_rows: + self.insert(client, self.collection_name, data=default_rows) + if partition1_rows: + self.insert(client, self.collection_name, data=partition1_rows, partition_name=self.partition_names[0]) + if partition2_rows: + self.insert(client, self.collection_name, data=partition2_rows, partition_name=self.partition_names[1]) + + self.flush(client, self.collection_name) + + # Create index + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=self.float_vector_field_name, + metric_type=self.float_vector_metric, + index_type=self.float_vector_index, + params={"nlist": 128}) + index_params.add_index(field_name=self.bfloat16_vector_field_name, + metric_type=self.bf16_vector_metric, + index_type=self.bf16_vector_index, + params={}) + index_params.add_index(field_name=self.sparse_vector_field_name, + metric_type=self.sparse_vector_metric, + index_type=self.sparse_vector_index, + params={}) + index_params.add_index(field_name=self.binary_vector_field_name, + metric_type=self.binary_vector_metric, + index_type=self.binary_vector_index, + params={"nlist": 128}) + index_params.add_index(field_name=self.inverted_string_field, index_type="INVERTED") + index_params.add_index(field_name=self.indexed_json_field, index_type="INVERTED", json_cast_type='json') + self.create_index(client, self.collection_name, index_params=index_params) + self.wait_for_index_ready(client, self.collection_name, index_name=self.float_vector_field_name) + self.wait_for_index_ready(client, self.collection_name, index_name=self.bfloat16_vector_field_name) + + # Load collection + self.load_collection(client, self.collection_name) + + def teardown(): + self.drop_collection(self._client(), self.collection_name) + + request.addfinalizer(teardown) + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("group_by_field", [DataType.VARCHAR.name, inverted_string_field_name + # TODO: need to run after #46605 $46614 #46616 fixed + # , DataType.JSON.name, indexed_json_field_name + # , f"{DataType.JSON.name}['number']" + # , dyna_filed_name1, f"{dyna_filed_name2}['string']" + ]) + def test_search_group_size(self, group_by_field): + """ + target: + 1. search on 4 different float vector fields with group by varchar field with group size + verify results entity = limit * group_size and group size is full if strict_group_size is True + verify results group counts = limit if strict_group_size is False + """ nq = 2 - limit = 10 - search_vectors = cf.gen_binary_vectors(nq, dim=ct.default_dim)[1] + limit = 50 + group_size = 5 + # we can group by json key instead of a real field, so output a real field name + output_field = group_by_field + if "number" in group_by_field: + output_field = DataType.JSON.name + if "string" in group_by_field: + output_field = dyna_filed_name2 + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + for j in range(len(self.vector_fields)): + if self.vector_fields[j] == self.binary_vector_field_name: + pass + else: + search_vectors = cf.gen_vectors(nq, dim=self.dims[j], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[ + j])) + search_params = {"params": cf.get_search_params_params(self.index_types[j])} + # when strict_group_size=true, it shall return results with entities = limit * group_size + res1 = self.search(client, self.collection_name, data=search_vectors, anns_field=self.vector_fields[j], + search_params=search_params, limit=limit, + group_by_field=group_by_field, filter=f"{output_field} is not null", + group_size=group_size, strict_group_size=True, + output_fields=[output_field])[0] + for i in range(nq): + assert len(res1[i]) == limit * group_size + for l in range(limit): + group_values = [] + for k in range(group_size): + group_values.append(res1[i][l * group_size + k].fields.get(output_field)) + if group_values and isinstance(group_values[0], dict): + group_values = [json.dumps(value) for value in group_values] + assert len(set(group_values)) == 1 + elif group_values: + assert len(set(group_values)) == 1 - # verify the results are same if group by pk - err_code = 999 - err_msg = "not support search_group_by operation based on binary" - collection_w.search(data=search_vectors, anns_field=ct.default_binary_vec_field_name, - param=search_params, limit=limit, - group_by_field=ct.default_int64_field_name, - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) + # when strict_group_size=false, it shall return results with group counts = limit + res1 = self.search(client, self.collection_name, data=search_vectors, anns_field=self.vector_fields[j], + search_params=search_params, limit=limit, + group_by_field=group_by_field, filter=f"{output_field} is not null", + group_size=group_size, strict_group_size=False, + output_fields=[output_field])[0] + for i in range(nq): + group_values = [] + for l in range(len(res1[i])): + group_values.append(res1[i][l].fields.get(output_field)) + if group_values and isinstance(group_values[0], dict): + group_values = [json.dumps(value) for value in group_values] + assert len(set(group_values)) == limit + elif group_values: + assert len(set(group_values)) == limit + + @pytest.mark.tags(CaseLabel.L0) + def test_hybrid_search_group_size(self): + """ + hybrid search group by on 4 different float vector fields with group by varchar field with group size + verify results returns with de-dup group values and group distances are in order as rank_group_scorer + """ + nq = 2 + limit = 50 + group_size = 5 + req_list = [] + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + for j in range(len(self.vector_fields)): + if self.vector_fields[j] == self.binary_vector_field_name: + pass # not support group by search on binary vector + else: + search_params = { + "data": cf.gen_vectors(nq, dim=self.dims[j], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[j])), + "anns_field": self.vector_fields[j], + "param": {"params": cf.get_search_params_params(self.index_types[j])}, + "limit": limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_params) + req_list.append(req) + # 4. hybrid search group by + rank_scorers = ["max", "avg", "sum"] + for scorer in rank_scorers: + res = self.hybrid_search(client, self.collection_name, reqs=req_list, ranker=WeightedRanker(0.1, 0.3, 0.6), + limit=limit, group_by_field=DataType.VARCHAR.name, group_size=group_size, + rank_group_scorer=scorer, output_fields=[DataType.VARCHAR.name])[0] + for i in range(nq): + group_values = [] + for l in range(len(res[i])): + group_values.append(res[i][l].get(DataType.VARCHAR.name)) + assert len(set(group_values)) == limit + + # group_distances = [] + tmp_distances = [100 for _ in range(group_size)] # init with a large value + group_distances = [res[i][0].distance] # init with the first value + for l in range(len(res[i]) - 1): + curr_group_value = res[i][l].get(DataType.VARCHAR.name) + next_group_value = res[i][l + 1].get(DataType.VARCHAR.name) + if curr_group_value == next_group_value: + group_distances.append(res[i][l + 1].distance) + else: + if scorer == 'sum': + assert np.sum(group_distances) <= np.sum(tmp_distances) + elif scorer == 'avg': + assert np.mean(group_distances) <= np.mean(tmp_distances) + else: # default max + assert np.max(group_distances) <= np.max(tmp_distances) + + tmp_distances = group_distances + group_distances = [res[i][l + 1].distance] + + @pytest.mark.tags(CaseLabel.L2) + def test_hybrid_search_group_by(self): + """ + verify hybrid search group by works with different Rankers + """ + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + # 3. prepare search params + req_list = [] + for i in range(len(self.vector_fields)): + if self.vector_fields[i] == self.binary_vector_field_name: + pass # not support group by search on binary vector + else: + search_param = { + "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[i])), + "anns_field": self.vector_fields[i], + "param": {}, + "limit": ct.default_limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_param) + req_list.append(req) + # 4. hybrid search group by + res = self.hybrid_search(client, self.collection_name, reqs=req_list, ranker=WeightedRanker(0.1, 0.9, 0.3), + limit=ct.default_limit, group_by_field=DataType.VARCHAR.name, + output_fields=[DataType.VARCHAR.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, "limit": ct.default_limit})[0] + for i in range(ct.default_nq): + group_values = [] + for l in range(ct.default_limit): + group_values.append(res[i][l].get(DataType.VARCHAR.name)) + assert len(group_values) == len(set(group_values)) + + # 5. hybrid search with RRFRanker on one vector field with group by + req_list = [] + for i in range(1, len(self.vector_fields)): + if self.vector_fields[i] == self.binary_vector_field_name: + pass # not support group by search on binary vector + else: + search_param = { + "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[i])), + "anns_field": self.vector_fields[i], + "param": {}, + "limit": ct.default_limit, + "expr": f"{self.primary_field} > 0"} + req = AnnSearchRequest(**search_param) + req_list.append(req) + self.hybrid_search(client, self.collection_name, reqs=req_list, ranker=RRFRanker(), + limit=ct.default_limit, group_by_field=self.inverted_string_field, + output_fields=[self.inverted_string_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, "limit": ct.default_limit}) + + @pytest.mark.tags(CaseLabel.L2) + def test_hybrid_search_group_by_empty_results(self): + """ + verify hybrid search group by works if group by empty results + """ + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + # 3. prepare search params + req_list = [] + for i in range(len(self.vector_fields)): + if self.vector_fields[i] == self.binary_vector_field_name: + pass # not support group by search on binary vector + else: + search_param = { + "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[i])), + "anns_field": self.vector_fields[i], + "param": {}, + "limit": ct.default_limit, + "expr": f"{self.primary_field} < 0"} # make sure return empty results + req = AnnSearchRequest(**search_param) + req_list.append(req) + # 4. hybrid search group by empty results + self.hybrid_search(client, self.collection_name, reqs=req_list, ranker=WeightedRanker(0.1, 0.9, 0.3), + limit=ct.default_limit, group_by_field=DataType.VARCHAR.name, + output_fields=[DataType.VARCHAR.name], + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, "limit": 0}) + + @pytest.mark.tags(CaseLabel.L2) + def test_search_group_by_supported_binary_vector(self): + """ + verify search group by works with supported binary vector + """ + client = self._client() + search_vectors = cf.gen_vectors(1, dim=self.binary_vector_dim, + vector_data_type=DataType.BINARY_VECTOR) + search_params = {} + limit = 1 + error = {ct.err_code: 999, + ct.err_msg: "not support search_group_by operation based on binary vector"} + self.search(client, self.collection_name, data=search_vectors, anns_field=self.binary_vector_field_name, + search_params=search_params, limit=limit, group_by_field=DataType.VARCHAR.name, + output_fields=[DataType.VARCHAR.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("support_field", [DataType.INT8.name, DataType.INT64.name, + DataType.BOOL.name, DataType.VARCHAR.name, + DataType.TIMESTAMPTZ.name]) + def test_search_group_by_supported_scalars(self, support_field): + """ + verify search group by works with supported scalar fields + """ + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + nq = 2 + limit = 15 + for j in range(len(self.vector_fields)): + if self.vector_fields[j] == self.binary_vector_field_name: + pass # not support group by search on binary vector + else: + search_vectors = cf.gen_vectors(nq, dim=self.dims[j], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[ + j])) + search_params = {"params": cf.get_search_params_params(self.index_types[j])} + res1 = self.search(client, self.collection_name, data=search_vectors, anns_field=self.vector_fields[j], + search_params=search_params, limit=limit, + filter=f"{support_field} is not null", + group_by_field=support_field, + output_fields=[support_field])[0] + for i in range(nq): + grpby_values = [] + dismatch = 0 + results_num = 2 if support_field == DataType.BOOL.name else limit + for l in range(results_num): + top1 = res1[i][l] + top1_grpby_pk = top1.id + top1_grpby_value = top1.get(support_field) + filter_expr = f"{support_field}=={top1_grpby_value}" + if support_field == DataType.VARCHAR.name: + filter_expr = f"{support_field}=='{top1_grpby_value}'" + if support_field == DataType.TIMESTAMPTZ.name: + filter_expr = f"{support_field}== ISO '{top1_grpby_value}'" + grpby_values.append(top1_grpby_value) + res_tmp = self.search(client, self.collection_name, data=[search_vectors[i]], + anns_field=self.vector_fields[j], + search_params=search_params, limit=1, filter=filter_expr, + output_fields=[support_field])[0] + top1_expr_pk = res_tmp[0][0].id + if top1_grpby_pk != top1_expr_pk: + dismatch += 1 + log.info( + f"{support_field} on {self.vector_fields[j]} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}") + log.info( + f"{support_field} on {self.vector_fields[j]} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}") + baseline = 1 if support_field == DataType.BOOL.name else 0.2 # skip baseline check for boolean + assert results_num > 0, "results_num should be greater than 0" + assert dismatch / results_num <= baseline + # verify no dup values of the group_by_field in results + assert len(grpby_values) == len(set(grpby_values)) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("grpby_unsupported_field", [ct.default_float_field_name, - ct.default_double_field_name, ct.default_float_vec_field_name]) + @pytest.mark.parametrize("grpby_unsupported_field", [DataType.FLOAT.name, DataType.DOUBLE.name, + DataType.GEOMETRY.name, + ct.default_float_vec_field_name]) def test_search_group_by_unsupported_field(self, grpby_unsupported_field): """ target: test search group by with the unsupported field @@ -159,130 +473,186 @@ class TestSearchGroupBy(TestcaseBase): 3. search with group by the unsupported fields verify: the error code and msg """ - metric = "IP" - collection_w = self.init_collection_general(prefix, insert_data=True, is_index=False, - is_all_data_type=True, with_json=True, )[0] - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.load() - - search_params = {"metric_type": metric, "params": {"ef": 64}} - nq = 1 + client = self._client() + search_vectors = cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + search_params = {} limit = 1 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) - - # search with groupby - err_code = 999 - err_msg = f"unsupported data type" - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=grpby_unsupported_field, - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) + error = {ct.err_code: 999, + ct.err_msg: f"unsupported data type {grpby_unsupported_field} for group by operator"} + if grpby_unsupported_field == ct.default_float_vec_field_name: + error = {ct.err_code: 999, + ct.err_msg: f"unsupported data type VECTOR_FLOAT for group by operator"} + self.search(client, self.collection_name, data=search_vectors, anns_field=self.float_vector_field_name, + search_params=search_params, limit=limit, group_by_field=grpby_unsupported_field, + output_fields=[grpby_unsupported_field], + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("index", ct.all_index_types[:8]) - def test_search_group_by_unsupported_index(self, index): + def test_search_pagination_group_by(self): """ - target: test search group by with the unsupported vector index - method: 1. create a collection with data - 2. create a groupby unsupported index - 3. search with group by - verify: the error code and msg + verify search group by works with pagination """ - support_group_by_index_types = ["HNSW", "IVF_FLAT", "FLAT", "IVF_SQ8", "IVF_RABITQ", "DISKANN", "SCANN"] - metric = "L2" - collection_w = self.init_collection_general(prefix, insert_data=True, is_index=False, - is_all_data_type=True, with_json=False)[0] - params = cf.get_index_params_params(index) - index_params = {"index_type": index, "params": params, "metric_type": metric} - collection_w.create_index(ct.default_float_vec_field_name, index_params) - collection_w.load() + limit = 10 + page_rounds = 3 + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + search_param = {} + default_search_exp = f"{self.primary_field} >= 0" + grpby_field = self.inverted_string_field + default_search_field = self.vector_fields[1] + search_vectors = cf.gen_vectors(1, dim=self.dims[1], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[1])) + all_pages_ids = [] + all_pages_grpby_field_values = [] + for r in range(page_rounds): + page_res = self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_param, limit=limit, offset=limit * r, + filter=default_search_exp, group_by_field=grpby_field, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit}, + )[0] + for j in range(limit): + all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) + all_pages_ids += page_res[0].ids + hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) + assert hit_rate >= 0.8 - search_params = {"params": {}} - nq = 1 - limit = 1 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) + total_res = self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_param, limit=limit * page_rounds, + filter=default_search_exp, group_by_field=grpby_field, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": limit * page_rounds} + )[0] + hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) + hit_rate = round(hit_num / (limit * page_rounds), 3) + assert hit_rate >= 0.8 + log.info(f"search pagination with groupby hit_rate: {hit_rate}") + grpby_field_values = [] + for i in range(limit * page_rounds): + grpby_field_values.append(total_res[0][i].get(grpby_field)) + assert len(grpby_field_values) == len(set(grpby_field_values)) - # search with groupby - if index in support_group_by_index_types: - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=ct.default_int8_field_name) - else: - err_code = 999 - err_msg = f"current index:{index} doesn't support" - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=ct.default_int8_field_name, - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) + @pytest.mark.tags(CaseLabel.L0) + def test_search_pagination_group_size(self): + """ + verify search group by works with pagination and group_size + """ + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + limit = 10 + group_size = 5 + page_rounds = 3 + search_param = {} + default_search_exp = f"{self.primary_field} >= 0" + grpby_field = self.inverted_string_field + default_search_field = self.vector_fields[1] + search_vectors = cf.gen_vectors(1, dim=self.dims[1], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[1])) + all_pages_ids = [] + all_pages_grpby_field_values = [] + res_count = limit * group_size + for r in range(page_rounds): + page_res = self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_param, limit=limit, offset=limit * r, + filter=default_search_exp, + group_by_field=grpby_field, group_size=group_size, + strict_group_size=True, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": res_count}, + )[0] + for j in range(res_count): + all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) + all_pages_ids += page_res[0].ids + + hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) + expect_hit_rate = round(1 / group_size, 3) * 0.7 + log.info(f"expect_hit_rate :{expect_hit_rate}, hit_rate:{hit_rate}, " + f"unique_group_by_value_count:{len(set(all_pages_grpby_field_values))}," + f"total_group_by_value_count:{len(all_pages_grpby_field_values)}") + assert hit_rate >= expect_hit_rate + + total_count = limit * group_size * page_rounds + total_res = self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_param, limit=limit * page_rounds, + filter=default_search_exp, + group_by_field=grpby_field, group_size=group_size, + strict_group_size=True, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": total_count} + )[0] + hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) + hit_rate = round(hit_num / (limit * page_rounds), 3) + assert hit_rate >= 0.8 + log.info(f"search pagination with groupby hit_rate: {hit_rate}") + grpby_field_values = [] + for i in range(total_count): + grpby_field_values.append(total_res[0][i].get(grpby_field)) + assert len(grpby_field_values) == total_count + assert len(set(grpby_field_values)) == limit * page_rounds @pytest.mark.tags(CaseLabel.L2) - def test_search_group_by_multi_fields(self): + def test_search_group_size_min_max(self): """ - target: test search group by with the multi fields - method: 1. create a collection with data - 2. create index - 3. search with group by the multi fields - verify: the error code and msg + verify search group by works with min and max group size """ - metric = "IP" - collection_w = self.init_collection_general(prefix, insert_data=False, is_index=False, - is_all_data_type=True, with_json=True, )[0] - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.load() + client = self._client() + collection_info = self.describe_collection(client, self.collection_name)[0] + group_by_field = self.inverted_string_field + default_search_field = self.vector_fields[1] + search_vectors = cf.gen_vectors(1, dim=self.dims[1], + vector_data_type=cf.get_field_dtype_by_field_name(collection_info, + self.vector_fields[1])) + search_params = {} + limit = 10 + max_group_size = 10 + self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_params, limit=limit, + group_by_field=group_by_field, + group_size=max_group_size, strict_group_size=True, + output_fields=[group_by_field]) + exceed_max_group_size = max_group_size + 1 + error = {ct.err_code: 999, + ct.err_msg: f"input group size:{exceed_max_group_size} exceeds configured max " + f"group size:{max_group_size}"} + self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_params, limit=limit, + group_by_field=group_by_field, + group_size=exceed_max_group_size, strict_group_size=True, + output_fields=[group_by_field], + check_task=CheckTasks.err_res, check_items=error) - search_params = {"metric_type": metric, "params": {"ef": 128}} - nq = 1 - limit = 1 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) + min_group_size = 1 + self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_params, limit=limit, + group_by_field=group_by_field, + group_size=max_group_size, strict_group_size=True, + output_fields=[group_by_field]) + below_min_group_size = min_group_size - 1 + error = {ct.err_code: 999, + ct.err_msg: f"input group size:{below_min_group_size} is negative"} + self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_params, limit=limit, + group_by_field=group_by_field, + group_size=below_min_group_size, strict_group_size=True, + output_fields=[group_by_field], + check_task=CheckTasks.err_res, check_items=error) - # search with groupby - err_code = 1700 - err_msg = f"groupBy field not found in schema" - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=[ct.default_string_field_name, ct.default_int32_field_name], - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("grpby_nonexist_field", ["nonexit_field", 100]) - def test_search_group_by_nonexit_fields(self, grpby_nonexist_field): - """ - target: test search group by with the nonexisting field - method: 1. create a collection with data - 2. create index - 3. search with group by the unsupported fields - verify: the error code and msg - """ - metric = "IP" - collection_w = self.init_collection_general(prefix, insert_data=False, is_index=False, - is_all_data_type=True, with_json=True, )[0] - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - - vector_name_list = cf.extract_vector_field_name_list(collection_w) - index_param = {"index_type": "FLAT", "metric_type": "COSINE", "params": {"nlist": 100}} - for vector_name in vector_name_list: - collection_w.create_index(vector_name, index_param) - collection_w.load() - - search_params = {"metric_type": metric, "params": {"ef": 128}} - nq = 1 - limit = 1 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) - - # search with groupby - err_code = 1700 - err_msg = f"groupBy field not found in schema: field not found[field={grpby_nonexist_field}]" - collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, - param=search_params, limit=limit, - group_by_field=grpby_nonexist_field, - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) + below_min_group_size = min_group_size - 10 + error = {ct.err_code: 999, + ct.err_msg: f"input group size:{below_min_group_size} is negative"} + self.search(client, self.collection_name, data=search_vectors, anns_field=default_search_field, + search_params=search_params, limit=limit, + group_by_field=group_by_field, + group_size=below_min_group_size, strict_group_size=True, + output_fields=[group_by_field], + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_search_iterator_not_support_group_by(self): @@ -294,31 +664,18 @@ class TestSearchGroupBy(TestcaseBase): 4. search with filtering every value of group_by_field verify: error code and msg """ - metric = "COSINE" - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_all_data_type=True, with_json=False)[0] - # insert with the same values for scalar fields - for _ in range(10): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) - - collection_w.flush() - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.load() - - grpby_field = ct.default_int32_field_name - search_vectors = cf.gen_vectors(1, dim=ct.default_dim) - search_params = {"metric_type": metric} - batch_size = 10 - - err_code = 1100 - err_msg = "Not allowed to do groupBy when doing iteration" - collection_w.search_iterator(search_vectors, ct.default_float_vec_field_name, - search_params, batch_size, group_by_field=grpby_field, - output_fields=[grpby_field], - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) + client = self._client() + search_vectors = cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + search_params = {} + grpby_field = DataType.VARCHAR.name + error = {ct.err_code: 1100, + ct.err_msg: "Not allowed to do groupBy when doing iteration"} + self.search_iterator(client, self.collection_name, data=search_vectors, + anns_field=self.float_vector_field_name, + search_params=search_params, batch_size=10, limit=100, + group_by_field=grpby_field, + check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L2) def test_range_search_not_support_group_by(self): @@ -329,35 +686,163 @@ class TestSearchGroupBy(TestcaseBase): 3. range search with group by verify: the error code and msg """ - metric = "COSINE" - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_all_data_type=True, with_json=False)[0] - _index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - # insert with the same values for scalar fields - for _ in range(10): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) + client = self._client() + search_vectors = cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + grpby_field = DataType.VARCHAR.name + error = {ct.err_code: 1100, + ct.err_msg: "Not allowed to do range-search when doing search-group-by"} + range_search_params = {"metric_type": "COSINE", "params": {"radius": 0.1, "range_filter": 0.5}} + self.search(client, self.collection_name, data=search_vectors, + anns_field=self.float_vector_field_name, + search_params=range_search_params, limit=5, + group_by_field=grpby_field, + check_task=CheckTasks.err_res, check_items=error) - collection_w.flush() - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) - collection_w.load() + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("grpby_nonexist_field", ["nonexist_field", 21]) + def test_search_group_by_non_exit_field_on_dynamic_enabled_collection(self, grpby_nonexist_field): + """ + target: test search group by with the non existing field against dynamic field enabled collection + method: 1. create a collection with dynamic field enabled + 2. create index + 3. search with group by the unsupported fields + verify: success + """ + client = self._client() + nq = 2 + search_vectors = cf.gen_vectors(nq, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + search_params = {} + limit = 100 + self.search(client, self.collection_name, data=search_vectors, + anns_field=self.float_vector_field_name, + search_params=search_params, limit=limit, + group_by_field=grpby_nonexist_field, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": limit}) - nq = 1 - limit = 5 - search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) - grpby_field = ct.default_int32_field_name - range_search_params = {"metric_type": "COSINE", "params": {"radius": 0.1, - "range_filter": 0.5}} - err_code = 1100 - err_msg = f"Not allowed to do range-search" - collection_w.search(search_vectors, ct.default_float_vec_field_name, - range_search_params, limit, - default_search_exp, group_by_field=grpby_field, - output_fields=[grpby_field], - check_task=CheckTasks.err_res, - check_items={"err_code": err_code, "err_msg": err_msg}) +@pytest.mark.xdist_group("TestGroupSearchInvalid") +class TestGroupSearchInvalid(TestMilvusClientV2Base): + def setup_class(self): + super().setup_class(self) + self.collection_name = "TestGroupSearchInvalid" + cf.gen_unique_str("_") + self.primary_field = "int64_pk" + self.float_vector_field_name = ct.default_float_vec_field_name + self.int8_vector_field_name = "int8_vector" + self.vector_fields = [self.float_vector_field_name, self.int8_vector_field_name] + self.float_vector_dim = 128 + self.int8_vector_dim = 64 + self.dims = [self.float_vector_dim, self.int8_vector_dim] + self.float_vector_metric = "L2" + self.int8_vector_metric = "COSINE" + self.float_vector_index = "FLAT" + self.int8_vector_index = "HNSW" + self.index_types = [self.float_vector_index, self.int8_vector_index] + + @pytest.fixture(scope="class", autouse=True) + def prepare_collection(self, request): + """ + Initialize collection before test class runs + """ + # Get client connection + client = self._client() + + # Create collection + fields = [] + fields.append(FieldSchema(name=self.primary_field, dtype=DataType.INT64, is_primary=True, auto_id=True)) + fields.append( + FieldSchema(name=self.float_vector_field_name, dtype=DataType.FLOAT_VECTOR, dim=self.float_vector_dim)) + + for data_type in ct.all_scalar_data_types: + fields.append(cf.gen_scalar_field(data_type, name=data_type.name, nullable=True)) + + collection_schema = CollectionSchema(fields, enable_dynamic_field=False) + collection_schema.add_field(self.int8_vector_field_name, DataType.INT8_VECTOR, dim=self.int8_vector_dim) + self.create_collection(client, self.collection_name, schema=collection_schema, force_teardown=False) + + # Define number of insert iterations + insert_times = 2 + nb = 1000 + # Insert data multiple times with non-duplicated primary keys + for j in range(insert_times): + rows = cf.gen_row_data_by_schema(nb, schema=collection_schema) + # Insert into collection + self.insert(client, self.collection_name, data=rows) + + self.flush(client, self.collection_name) + + # Create index + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=self.float_vector_field_name, + metric_type=self.float_vector_metric, + index_type=self.float_vector_index, + params={"nlist": 128}) + index_params.add_index(field_name=self.int8_vector_field_name, + metric_type=self.int8_vector_metric, + index_type=self.int8_vector_index, + params={"nlist": 128}) + self.create_index(client, self.collection_name, index_params=index_params) + self.wait_for_index_ready(client, self.collection_name, index_name=self.float_vector_field_name) + self.wait_for_index_ready(client, self.collection_name, index_name=self.int8_vector_field_name) + + # Load collection + self.load_collection(client, self.collection_name) + + def teardown(): + self.drop_collection(self._client(), self.collection_name) + + request.addfinalizer(teardown) + + @pytest.mark.tags(CaseLabel.L2) + def test_search_group_by_multi_fields(self): + """ + target: test search group by with the multi fields + method: 1. create a collection with data + 2. create index + 3. search with group by the multi fields + verify: the error code and msg + """ + client = self._client() + search_params = {} + search_vectors = cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + # verify + error = {ct.err_code: 1700, + ct.err_msg: f"groupBy field not found in schema"} + self.search(client, self.collection_name, data=search_vectors, + anns_field=self.float_vector_field_name, + search_params=search_params, limit=10, + group_by_field=[DataType.VARCHAR.name, DataType.INT32.name], + output_fields=[DataType.VARCHAR.name, DataType.INT32.name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("grpby_nonexist_field", ["nonexist_field", 21]) + def test_search_group_by_non_exit_field(self, grpby_nonexist_field): + """ + target: test search group by with the nonexisting field + method: 1. create a collection with data + 2. create index + 3. search with group by the unsupported fields + verify: the error code and msg + """ + client = self._client() + search_vectors = cf.gen_vectors(1, dim=self.float_vector_dim, + vector_data_type=DataType.FLOAT_VECTOR) + search_params = {} + limit = 1 + error = {ct.err_code: 1700, + ct.err_msg: f"groupBy field not found in schema: field not found[field={grpby_nonexist_field}]"} + self.search(client, self.collection_name, data=search_vectors, + anns_field=self.float_vector_field_name, + search_params=search_params, limit=limit, + group_by_field=grpby_nonexist_field, + check_task=CheckTasks.err_res, check_items=error) + + +class TestSearchGroupByIndependent(TestMilvusClientV2Base): @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("metric", ["L2", "IP", "COSINE"]) def test_search_group_by_flat_index_correctness(self, metric): @@ -371,37 +856,50 @@ class TestSearchGroupBy(TestcaseBase): for the same group issue: https://github.com/milvus-io/milvus/issues/46349 """ - collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False, - is_all_data_type=True, with_json=False)[0] - # create FLAT index - _index = {"index_type": "FLAT", "metric_type": metric, "params": {}} - collection_w.create_index(ct.default_float_vec_field_name, index_params=_index) + client = self._client() + collection_name = cf.gen_collection_name_by_testcase_name() + schema = self.create_schema(client)[0] + schema.add_field(field_name=ct.default_primary_field_name, datatype=DataType.INT64, is_primary=True) + schema.add_field(field_name=ct.default_float_vec_field_name, datatype=DataType.FLOAT_VECTOR, dim=ct.default_dim) + schema.add_field(field_name=ct.default_int32_field_name, datatype=DataType.INT32) + + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name=ct.default_float_vec_field_name, index_type="FLAT", metric_type=metric) + self.create_collection(client, collection_name, schema=schema, index_params=index_params) + self.load_collection(client, collection_name) - # insert data with 10 different group values, 100 records per group for _ in range(10): - data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False) - collection_w.insert(data) - - collection_w.flush() - collection_w.load() + rows = [] + for i in range(ct.default_nb): + row = { + ct.default_primary_field_name: i, + ct.default_float_vec_field_name: cf.gen_vectors(1, dim=ct.default_dim)[0], + ct.default_int32_field_name: i, + } + rows.append(row) + self.insert(client, collection_name, data=rows) + self.flush(client, collection_name) nq = 1 limit = 1 search_vectors = cf.gen_vectors(nq, dim=ct.default_dim) grpby_field = ct.default_int32_field_name - search_params = {"metric_type": metric, "params": {}} + + search_params = {} # normal search to get the best result - normal_res = collection_w.search(search_vectors, ct.default_float_vec_field_name, - search_params, limit, - output_fields=[grpby_field])[0] - + normal_res = self.search(client, collection_name, data=search_vectors, + search_params=search_params, limit=limit, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": limit})[0] # group_by search - groupby_res = collection_w.search(search_vectors, ct.default_float_vec_field_name, - search_params, limit, - group_by_field=grpby_field, - output_fields=[grpby_field])[0] - + groupby_res = self.search(client, collection_name, data=search_vectors, + search_params=search_params, limit=limit, + group_by_field=grpby_field, + output_fields=[grpby_field], + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": limit})[0] # verify that the top result from group_by search matches the normal search # for the same group value, group_by should return the best (closest) result normal_top_distance = normal_res[0][0].distance diff --git a/tests/python_client/milvus_client_v2/test_milvus_client_search_v2.py b/tests/python_client/milvus_client_v2/test_milvus_client_search_v2.py index 0341728629..d166ccfbcc 100644 --- a/tests/python_client/milvus_client_v2/test_milvus_client_search_v2.py +++ b/tests/python_client/milvus_client_v2/test_milvus_client_search_v2.py @@ -690,7 +690,7 @@ class TestCollectionSearch(TestcaseBase): insert_ids = [] vector_name_list = cf.extract_vector_field_name_list(collection_w) for vector_field_name in vector_name_list: - vector_data_type = cf.get_field_dtype_by_field_name(collection_w, vector_field_name) + vector_data_type = cf.get_field_dtype_by_field_name(collection_w.schema, vector_field_name) vectors = cf.gen_vectors(nq, dim, vector_data_type) res = collection_w.search(vectors[:nq], vector_field_name, default_search_params, default_limit, diff --git a/tests/python_client/testcases/test_mix_scenes.py b/tests/python_client/testcases/test_mix_scenes.py index 531a06e8a8..2f9ff22eb6 100644 --- a/tests/python_client/testcases/test_mix_scenes.py +++ b/tests/python_client/testcases/test_mix_scenes.py @@ -2307,445 +2307,4 @@ class TestMixScenes(TestcaseBase): # query expr = f'{scalar_field} == {expr_data}' if scalar_field == 'INT64' else f'{scalar_field} == "{expr_data}"' res, _ = self.collection_wrap.query(expr=expr, output_fields=[scalar_field], limit=100) - assert set([r.get(scalar_field) for r in res]) == {expr_data} - - -@pytest.mark.xdist_group("TestGroupSearch") -class TestGroupSearch(TestCaseClassBase): - """ - Testing group search scenarios - 1. collection schema: - int64_pk(auto_id), varchar, - float16_vector, float_vector, bfloat16_vector, sparse_vector, - inverted_varchar - 2. varchar field is inserted with dup values for group by - 3. index for each vector field with different index types, dims and metric types - Author: Yanliang567 - """ - def setup_class(self): - super().setup_class(self) - - # connect to server before testing - self._connect(self) - - # init params - self.primary_field = "int64_pk" - self.inverted_string_field = "varchar_inverted" - - # create a collection with fields - self.collection_wrap.init_collection( - name=cf.gen_unique_str("TestGroupSearch"), - schema=cf.set_collection_schema( - fields=[self.primary_field, DataType.VARCHAR.name, DataType.FLOAT16_VECTOR.name, - DataType.FLOAT_VECTOR.name, DataType.BFLOAT16_VECTOR.name, DataType.SPARSE_FLOAT_VECTOR.name, - DataType.INT8.name, DataType.INT64.name, DataType.BOOL.name, - self.inverted_string_field], - field_params={ - self.primary_field: FieldParams(is_primary=True).to_dict, - DataType.FLOAT16_VECTOR.name: FieldParams(dim=31).to_dict, - DataType.FLOAT_VECTOR.name: FieldParams(dim=64).to_dict, - DataType.BFLOAT16_VECTOR.name: FieldParams(dim=24).to_dict, - DataType.VARCHAR.name: FieldParams(nullable=True).to_dict, - DataType.INT8.name: FieldParams(nullable=True).to_dict, - DataType.INT64.name: FieldParams(nullable=True).to_dict, - DataType.BOOL.name: FieldParams(nullable=True).to_dict - }, - auto_id=True - ) - ) - - self.vector_fields = [DataType.FLOAT16_VECTOR.name, DataType.FLOAT_VECTOR.name, - DataType.BFLOAT16_VECTOR.name, DataType.SPARSE_FLOAT_VECTOR.name] - self.dims = [31, 64, 24, 99] - self.index_types = [cp.IndexName.IVF_SQ8, cp.IndexName.HNSW, cp.IndexName.IVF_FLAT, cp.IndexName.SPARSE_WAND] - - @pytest.fixture(scope="class", autouse=True) - def prepare_data(self): - # prepare data (> 1024 triggering index building) - nb = 100 - for _ in range(100): - string_values = pd.Series(data=[str(i) for i in range(nb)], dtype="string") - data = [string_values] - for i in range(len(self.vector_fields)): - data.append(cf.gen_vectors(dim=self.dims[i], - nb=nb, - vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, - self.vector_fields[i]))) - if i%5 != 0: - data.append(pd.Series(data=[np.int8(i) for i in range(nb)], dtype="int8")) - data.append(pd.Series(data=[np.int64(i) for i in range(nb)], dtype="int64")) - data.append(pd.Series(data=[np.bool_(i) for i in range(nb)], dtype="bool")) - data.append(pd.Series(data=[str(i) for i in range(nb)], dtype="string")) - else: - data.append(pd.Series(data=[None for _ in range(nb)], dtype="int8")) - data.append(pd.Series(data=[None for _ in range(nb)], dtype="int64")) - data.append(pd.Series(data=[None for _ in range(nb)], dtype="bool")) - data.append(pd.Series(data=[None for _ in range(nb)], dtype="string")) - self.collection_wrap.insert(data) - - # flush collection, segment sealed - self.collection_wrap.flush() - - # build index for each vector field - index_params = { - **DefaultVectorIndexParams.IVF_SQ8(DataType.FLOAT16_VECTOR.name, metric_type=MetricType.L2), - **DefaultVectorIndexParams.HNSW(DataType.FLOAT_VECTOR.name, metric_type=MetricType.IP), - **DefaultVectorIndexParams.DISKANN(DataType.BFLOAT16_VECTOR.name, metric_type=MetricType.COSINE), - **DefaultVectorIndexParams.SPARSE_WAND(DataType.SPARSE_FLOAT_VECTOR.name, metric_type=MetricType.IP), - # index params for varchar field - **DefaultScalarIndexParams.INVERTED(self.inverted_string_field) - } - - self.build_multi_index(index_params=index_params) - assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys()) - - # load collection - self.collection_wrap.load() - - @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("group_by_field", [DataType.VARCHAR.name, "varchar_inverted"]) - def test_search_group_size(self, group_by_field): - """ - target: - 1. search on 4 different float vector fields with group by varchar field with group size - verify results entity = limit * group_size and group size is full if strict_group_size is True - verify results group counts = limit if strict_group_size is False - """ - nq = 2 - limit = 50 - group_size = 5 - for j in range(len(self.vector_fields)): - search_vectors = cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[j])) - search_params = {"params": cf.get_search_params_params(self.index_types[j])} - # when strict_group_size=true, it shall return results with entities = limit * group_size - res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=group_size, strict_group_size=True, - output_fields=[group_by_field])[0] - for i in range(nq): - assert len(res1[i]) == limit * group_size - for l in range(limit): - group_values = [] - for k in range(group_size): - group_values.append(res1[i][l*group_size+k].fields.get(group_by_field)) - assert len(set(group_values)) == 1 - - # when strict_group_size=false, it shall return results with group counts = limit - res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=group_size, strict_group_size=False, - output_fields=[group_by_field])[0] - for i in range(nq): - group_values = [] - for l in range(len(res1[i])): - group_values.append(res1[i][l].fields.get(group_by_field)) - assert len(set(group_values)) == limit - - @pytest.mark.tags(CaseLabel.L0) - def test_hybrid_search_group_size(self): - """ - hybrid search group by on 4 different float vector fields with group by varchar field with group size - verify results returns with de-dup group values and group distances are in order as rank_group_scorer - """ - nq = 2 - limit = 50 - group_size = 5 - req_list = [] - for j in range(len(self.vector_fields)): - search_params = { - "data": cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[j])), - "anns_field": self.vector_fields[j], - "param": {"params": cf.get_search_params_params(self.index_types[j])}, - "limit": limit, - "expr": f"{self.primary_field} > 0"} - req = AnnSearchRequest(**search_params) - req_list.append(req) - # 4. hybrid search group by - rank_scorers = ["max", "avg", "sum"] - for scorer in rank_scorers: - res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.1, 0.3, 0.9, 0.6), - limit=limit, - group_by_field=DataType.VARCHAR.name, - group_size=group_size, rank_group_scorer=scorer, - output_fields=[DataType.VARCHAR.name])[0] - for i in range(nq): - group_values = [] - for l in range(len(res[i])): - group_values.append(res[i][l].fields.get(DataType.VARCHAR.name)) - assert len(set(group_values)) == limit - - # group_distances = [] - tmp_distances = [100 for _ in range(group_size)] # init with a large value - group_distances = [res[i][0].distance] # init with the first value - for l in range(len(res[i]) - 1): - curr_group_value = res[i][l].fields.get(DataType.VARCHAR.name) - next_group_value = res[i][l + 1].fields.get(DataType.VARCHAR.name) - if curr_group_value == next_group_value: - group_distances.append(res[i][l + 1].distance) - else: - if scorer == 'sum': - assert np.sum(group_distances) <= np.sum(tmp_distances) - elif scorer == 'avg': - assert np.mean(group_distances) <= np.mean(tmp_distances) - else: # default max - assert np.max(group_distances) <= np.max(tmp_distances) - - tmp_distances = group_distances - group_distances = [res[i][l + 1].distance] - - @pytest.mark.tags(CaseLabel.L2) - def test_hybrid_search_group_by(self): - """ - verify hybrid search group by works with different Rankers - """ - # 3. prepare search params - req_list = [] - for i in range(len(self.vector_fields)): - search_param = { - "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], - vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, - self.vector_fields[i])), - "anns_field": self.vector_fields[i], - "param": {}, - "limit": ct.default_limit, - "expr": f"{self.primary_field} > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search group by - res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 0.2, 0.3), ct.default_limit, - group_by_field=DataType.VARCHAR.name, - output_fields=[DataType.VARCHAR.name], - check_task=CheckTasks.check_search_results, - check_items={"nq": ct.default_nq, "limit": ct.default_limit})[0] - for i in range(ct.default_nq): - group_values = [] - for l in range(ct.default_limit): - group_values.append(res[i][l].fields.get(DataType.VARCHAR.name)) - assert len(group_values) == len(set(group_values)) - - # 5. hybrid search with RRFRanker on one vector field with group by - req_list = [] - for i in range(1, len(self.vector_fields)): - search_param = { - "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[i])), - "anns_field": self.vector_fields[i], - "param": {}, - "limit": ct.default_limit, - "expr": f"{self.primary_field} > 0"} - req = AnnSearchRequest(**search_param) - req_list.append(req) - self.collection_wrap.hybrid_search(req_list, RRFRanker(), ct.default_limit, - group_by_field=self.inverted_string_field, - check_task=CheckTasks.check_search_results, - check_items={"nq": ct.default_nq, "limit": ct.default_limit}) - - @pytest.mark.tags(CaseLabel.L2) - def test_hybrid_search_group_by_empty_results(self): - """ - verify hybrid search group by works if group by empty results - """ - # 3. prepare search params - req_list = [] - for i in range(len(self.vector_fields)): - search_param = { - "data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], - vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, - self.vector_fields[i])), - "anns_field": self.vector_fields[i], - "param": {}, - "limit": ct.default_limit, - "expr": f"{self.primary_field} < 0"} # make sure return empty results - req = AnnSearchRequest(**search_param) - req_list.append(req) - # 4. hybrid search group by empty resutls - self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 0.2, 0.3), ct.default_limit, - group_by_field=DataType.VARCHAR.name, - output_fields=[DataType.VARCHAR.name], - check_task=CheckTasks.check_search_results, - check_items={"nq": ct.default_nq, "limit": 0}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.parametrize("support_field", [DataType.INT8.name, DataType.INT64.name, - DataType.BOOL.name, DataType.VARCHAR.name]) - def test_search_group_by_supported_scalars(self, support_field): - """ - verify search group by works with supported scalar fields - """ - nq = 2 - limit = 15 - for j in range(len(self.vector_fields)): - search_vectors = cf.gen_vectors(nq, dim=self.dims[j], - vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, - self.vector_fields[j])) - search_params = {"params": cf.get_search_params_params(self.index_types[j])} - res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j], - param=search_params, limit=limit, - group_by_field=support_field, - output_fields=[support_field])[0] - for i in range(nq): - grpby_values = [] - dismatch = 0 - results_num = 2 if support_field == DataType.BOOL.name else limit - for l in range(results_num): - top1 = res1[i][l] - top1_grpby_pk = top1.id - top1_grpby_value = top1.fields.get(support_field) - expr = f"{support_field}=={top1_grpby_value}" - if support_field == DataType.VARCHAR.name: - expr = f"{support_field}=='{top1_grpby_value}'" - grpby_values.append(top1_grpby_value) - res_tmp = self.collection_wrap.search(data=[search_vectors[i]], anns_field=self.vector_fields[j], - param=search_params, limit=1, expr=expr, - output_fields=[support_field])[0] - top1_expr_pk = res_tmp[0][0].id - if top1_grpby_pk != top1_expr_pk: - dismatch += 1 - log.info(f"{support_field} on {self.vector_fields[j]} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}") - log.info(f"{support_field} on {self.vector_fields[j]} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}") - baseline = 1 if support_field == DataType.BOOL.name else 0.2 # skip baseline check for boolean - assert dismatch / results_num <= baseline - # verify no dup values of the group_by_field in results - assert len(grpby_values) == len(set(grpby_values)) - - @pytest.mark.tags(CaseLabel.L2) - def test_search_pagination_group_by(self): - """ - verify search group by works with pagination - """ - limit = 10 - page_rounds = 3 - search_param = {} - default_search_exp = f"{self.primary_field} >= 0" - grpby_field = self.inverted_string_field - default_search_field = self.vector_fields[1] - search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[1])) - all_pages_ids = [] - all_pages_grpby_field_values = [] - for r in range(page_rounds): - page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit, offset=limit * r, - expr=default_search_exp, group_by_field=grpby_field, - output_fields=[grpby_field], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": limit}, - )[0] - for j in range(limit): - all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) - all_pages_ids += page_res[0].ids - hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) - assert hit_rate >= 0.8 - - total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit * page_rounds, - expr=default_search_exp, group_by_field=grpby_field, - output_fields=[grpby_field], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": limit * page_rounds} - )[0] - hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) - hit_rate = round(hit_num / (limit * page_rounds), 3) - assert hit_rate >= 0.8 - log.info(f"search pagination with groupby hit_rate: {hit_rate}") - grpby_field_values = [] - for i in range(limit * page_rounds): - grpby_field_values.append(total_res[0][i].fields.get(grpby_field)) - assert len(grpby_field_values) == len(set(grpby_field_values)) - - @pytest.mark.tags(CaseLabel.L0) - def test_search_pagination_group_size(self): - limit = 10 - group_size = 5 - page_rounds = 3 - search_param = {} - default_search_exp = f"{self.primary_field} >= 0" - grpby_field = self.inverted_string_field - default_search_field = self.vector_fields[1] - search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[1])) - all_pages_ids = [] - all_pages_grpby_field_values = [] - res_count = limit * group_size - for r in range(page_rounds): - page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit, offset=limit * r, - expr=default_search_exp, - group_by_field=grpby_field, group_size=group_size, - strict_group_size=True, - output_fields=[grpby_field], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": res_count}, - )[0] - for j in range(res_count): - all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field)) - all_pages_ids += page_res[0].ids - - hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3) - expect_hit_rate = round(1 / group_size, 3) * 0.7 - log.info(f"expect_hit_rate :{expect_hit_rate}, hit_rate:{hit_rate}, " - f"unique_group_by_value_count:{len(set(all_pages_grpby_field_values))}," - f"total_group_by_value_count:{len(all_pages_grpby_field_values)}") - assert hit_rate >= expect_hit_rate - - total_count = limit * group_size * page_rounds - total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field, - param=search_param, limit=limit * page_rounds, - expr=default_search_exp, - group_by_field=grpby_field, group_size=group_size, - strict_group_size=True, - output_fields=[grpby_field], - check_task=CheckTasks.check_search_results, - check_items={"nq": 1, "limit": total_count} - )[0] - hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids))) - hit_rate = round(hit_num / (limit * page_rounds), 3) - assert hit_rate >= 0.8 - log.info(f"search pagination with groupby hit_rate: {hit_rate}") - grpby_field_values = [] - for i in range(total_count): - grpby_field_values.append(total_res[0][i].fields.get(grpby_field)) - assert len(grpby_field_values) == total_count - assert len(set(grpby_field_values)) == limit * page_rounds - - @pytest.mark.tags(CaseLabel.L2) - def test_search_group_size_min_max(self): - """ - verify search group by works with min and max group size - """ - group_by_field = self.inverted_string_field - default_search_field = self.vector_fields[1] - search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=cf.get_field_dtype_by_field_name(self.collection_wrap, self.vector_fields[1])) - search_params = {} - limit = 10 - max_group_size = 10 - self.collection_wrap.search(data=search_vectors, anns_field=default_search_field, - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=max_group_size, strict_group_size=True, - output_fields=[group_by_field]) - exceed_max_group_size = max_group_size + 1 - error = {ct.err_code: 999, - ct.err_msg: f"input group size:{exceed_max_group_size} exceeds configured max " - f"group size:{max_group_size}"} - self.collection_wrap.search(data=search_vectors, anns_field=default_search_field, - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=exceed_max_group_size, strict_group_size=True, - output_fields=[group_by_field], - check_task=CheckTasks.err_res, check_items=error) - - min_group_size = 1 - self.collection_wrap.search(data=search_vectors, anns_field=default_search_field, - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=max_group_size, strict_group_size=True, - output_fields=[group_by_field]) - below_min_group_size = min_group_size - 1 - error = {ct.err_code: 999, - ct.err_msg: f"input group size:{below_min_group_size} is negative"} - self.collection_wrap.search(data=search_vectors, anns_field=default_search_field, - param=search_params, limit=limit, - group_by_field=group_by_field, - group_size=below_min_group_size, strict_group_size=True, - output_fields=[group_by_field], - check_task=CheckTasks.err_res, check_items=error) + assert set([r.get(scalar_field) for r in res]) == {expr_data} \ No newline at end of file