diff --git a/tests/python_client/common/phrase_match_generator.py b/tests/python_client/common/phrase_match_generator.py new file mode 100644 index 0000000000..e77b045434 --- /dev/null +++ b/tests/python_client/common/phrase_match_generator.py @@ -0,0 +1,370 @@ +import re +import jieba +from faker import Faker +from tantivy import SchemaBuilder, Document, Index, Query +from typing import List, Dict +import numpy as np +import random + + +class PhraseMatchTestGenerator: + def __init__(self, language="en"): + """ + Initialize the test data generator + + Args: + language: Language for text generation ('en' for English, 'zh' for Chinese) + """ + self.language = language + self.index = None + self.documents = [] + + # English vocabulary + self.en_activities = [ + "swimming", + "football", + "basketball", + "tennis", + "volleyball", + "baseball", + "golf", + "rugby", + "cricket", + "boxing", + "running", + "cycling", + "skating", + "skiing", + "surfing", + "diving", + "climbing", + "yoga", + "dancing", + "hiking", + ] + + self.en_verbs = [ + "love", + "like", + "enjoy", + "play", + "practice", + "prefer", + "do", + "learn", + "teach", + "watch", + "start", + "begin", + "continue", + "finish", + "master", + "try", + ] + + self.en_connectors = [ + "and", + "or", + "but", + "while", + "after", + "before", + "then", + "also", + "plus", + "with", + ] + + self.en_modifiers = [ + "very much", + "a lot", + "seriously", + "casually", + "professionally", + "regularly", + "often", + "sometimes", + "daily", + "weekly", + ] + + # Chinese vocabulary + self.zh_activities = [ + "游泳", + "足球", + "篮球", + "网球", + "排球", + "棒球", + "高尔夫", + "橄榄球", + "板球", + "拳击", + "跑步", + "骑行", + "滑冰", + "滑雪", + "冲浪", + "潜水", + "攀岩", + "瑜伽", + "跳舞", + "徒步", + ] + + self.zh_verbs = [ + "喜欢", + "热爱", + "享受", + "玩", + "练习", + "偏好", + "做", + "学习", + "教", + "观看", + "开始", + "开启", + "继续", + "完成", + "掌握", + "尝试", + ] + + self.zh_connectors = [ + "和", + "或者", + "但是", + "同时", + "之后", + "之前", + "然后", + "也", + "加上", + "跟", + ] + + self.zh_modifiers = [ + "非常", + "很多", + "认真地", + "随意地", + "专业地", + "定期地", + "经常", + "有时候", + "每天", + "每周", + ] + + # Set vocabulary based on language + self.activities = self.zh_activities if language == "zh" else self.en_activities + self.verbs = self.zh_verbs if language == "zh" else self.en_verbs + self.connectors = self.zh_connectors if language == "zh" else self.en_connectors + self.modifiers = self.zh_modifiers if language == "zh" else self.en_modifiers + + def tokenize_text(self, text: str) -> List[str]: + """Tokenize text using jieba tokenizer""" + text = text.strip() + text = re.sub(r"[^\w\s]", " ", text) + text = text.replace("\n", " ") + if self.language == "zh": + text = text.replace(" ", "") + return list(jieba.cut_for_search(text)) + else: + return list(text.split()) + + def generate_embedding(self, dim: int) -> List[float]: + """Generate random embedding vector""" + return list(np.random.random(dim)) + + def generate_text_pattern(self) -> str: + """Generate test document text with various patterns""" + patterns = [ + # Simple pattern with two activities + lambda: f"{random.choice(self.activities)} {random.choice(self.activities)}", + # Pattern with connector between activities + lambda: f"{random.choice(self.activities)} {random.choice(self.connectors)} {random.choice(self.activities)}", + # Pattern with modifier between activities + lambda: f"{random.choice(self.activities)} {random.choice(self.modifiers)} {random.choice(self.activities)}", + # Complex pattern with verb and activities + lambda: f"{random.choice(self.verbs)} {random.choice(self.activities)} {random.choice(self.activities)}", + # Pattern with multiple gaps + lambda: f"{random.choice(self.activities)} {random.choice(self.modifiers)} {random.choice(self.connectors)} {random.choice(self.activities)}", + ] + return random.choice(patterns)() + + def generate_test_data(self, num_documents: int, dim: int) -> List[Dict]: + """ + Generate test documents with text and embeddings + + Args: + num_documents: Number of documents to generate + dim: Dimension of embedding vectors + + Returns: + List of dictionaries containing document data + """ + # Generate documents + self.documents = [] + for i in range(num_documents): + self.documents.append( + { + "id": i, + "text": self.generate_text_pattern() + if self.language == "en" + else self.generate_text_pattern().replace(" ", ""), + "emb": self.generate_embedding(dim), + } + ) + + # Initialize Tantivy index + schema_builder = SchemaBuilder() + + schema_builder.add_text_field("text", stored=True) + schema_builder.add_unsigned_field("doc_id", stored=True) + schema = schema_builder.build() + + self.index = Index(schema=schema, path=None) + + writer = self.index.writer() + + # Index all documents + for doc in self.documents: + document = Document() + new_text = " ".join(self.tokenize_text(doc["text"])) + document.add_text("text", new_text) + document.add_unsigned("doc_id", doc["id"]) + writer.add_document(document) + + writer.commit() + self.index.reload() + + return self.documents + + def _generate_random_word(self, exclude_words: List[str]) -> str: + """ + Generate a random word that is not in the exclude_words list using Faker + """ + fake = Faker() + while True: + word = fake.word() + if word not in exclude_words: + return word + + def generate_pattern_documents(self, patterns: List[tuple], dim: int, num_docs_per_pattern: int = 1) -> List[Dict]: + """ + Generate documents that match specific test patterns with their corresponding slop values + + Args: + patterns: List of tuples containing (pattern, slop) pairs + dim: Dimension of embedding vectors + num_docs_per_pattern: Number of documents to generate for each pattern + + Returns: + List of dictionaries containing document data with text and embeddings + """ + pattern_documents = [] + for pattern, slop in patterns: + # Split pattern into components + pattern_words = pattern.split() + + # Generate multiple documents for each pattern + if slop == 0: # Exact phrase + text = " ".join(pattern_words) + pattern_documents.append({ + "id": random.randint(0, 1000000), "text": text, "emb": self.generate_embedding(dim)}) + + else: # Pattern with gaps + # Generate slop number of unique words + insert_words = [] + for _ in range(slop): + new_word = self._generate_random_word(pattern_words + insert_words) + insert_words.append(new_word) + + # Insert the words randomly between the pattern words + all_words = pattern_words.copy() + for word in insert_words: + # Random position between pattern words + pos = random.randint(1, len(all_words)) + all_words.insert(pos, word) + + text = " ".join(all_words) + pattern_documents.append({ + "id": random.randint(0, 1000000), + "text": text, + "emb": self.generate_embedding(dim)}) + + new_pattern_documents = [] + start = 1000000 + for i in range(num_docs_per_pattern): + for doc in pattern_documents: + new_doc = dict(doc) + new_doc["id"] = start + len(new_pattern_documents) + new_pattern_documents.append(new_doc) + + return new_pattern_documents + + def generate_test_queries(self, num_queries: int) -> List[Dict]: + """ + Generate test queries with varying slop values + + Args: + num_queries: Number of queries to generate + + Returns: + List of dictionaries containing query information + """ + queries = [] + slop_values = [0, 1, 2, 3] # Common slop values + + for i in range(num_queries): + # Randomly select two or three words for the query + num_words = random.choice([2, 3]) + words = random.sample(self.activities, num_words) + + queries.append( + { + "id": i, + "query": " ".join(words) + if self.language == "en" + else "".join(words), + "slop": random.choice(slop_values), + "type": f"{num_words}_words", + } + ) + + return queries + + def get_query_results(self, query: str, slop: int) -> List[Dict]: + """ + Get all documents that match the phrase query + + Args: + query: Query phrase + slop: Maximum allowed word gap + + Returns: + List[Dict]: List of matching documents with their ids and texts + """ + if self.index is None: + raise RuntimeError("No documents indexed. Call generate_test_data first.") + + # Clean and normalize query + query_terms = self.tokenize_text(query) + + # Create phrase query + searcher = self.index.searcher() + phrase_query = Query.phrase_query(self.index.schema, "text", query_terms, slop) + + # Search for matches + results = searcher.search(phrase_query, limit=len(self.documents)) + + # Extract all matching documents + matched_docs = [] + for _, doc_address in results.hits: + doc = searcher.doc(doc_address) + doc_id = doc.to_dict()["doc_id"] + matched_docs.extend(doc_id) + + return matched_docs diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index fc8cf21154..2ffbd97705 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -66,6 +66,7 @@ fastparquet==2023.7.0 ml-dtypes==0.2.0 # for full text search +tantivy==0.22.0 bm25s==0.2.0 jieba==0.42.1 Unidecode==1.3.8 @@ -75,4 +76,4 @@ Unidecode==1.3.8 locust==2.25.0 # for supporting higher python version -typing_extensions==4.12.2 \ No newline at end of file +typing_extensions==4.12.2 diff --git a/tests/python_client/testcases/test_phrase_match.py b/tests/python_client/testcases/test_phrase_match.py new file mode 100644 index 0000000000..316e44ae00 --- /dev/null +++ b/tests/python_client/testcases/test_phrase_match.py @@ -0,0 +1,419 @@ +from common.common_type import CaseLabel +from common.phrase_match_generator import PhraseMatchTestGenerator +import pytest +import pandas as pd +from pymilvus import FieldSchema, CollectionSchema, DataType + +from common.common_type import CheckTasks +from utils.util_log import test_log as log +from common import common_func as cf +from base.client_base import TestcaseBase + +prefix = "phrase_match" + + +def init_collection_schema( + dim: int, tokenizer: str, enable_partition_key: bool +) -> CollectionSchema: + """Initialize collection schema with specified parameters""" + analyzer_params = {"tokenizer": tokenizer} + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_analyzer=True, + enable_match=True, + is_partition_key=enable_partition_key, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + return CollectionSchema(fields=fields, description="phrase match test collection") + + +@pytest.mark.tags(CaseLabel.L0) +class TestQueryPhraseMatch(TestcaseBase): + """ + Test cases for phrase match functionality in Milvus using PhraseMatchTestGenerator. + This class verifies the phrase matching capabilities with different configurations + including various tokenizers, partition keys, and index settings. + """ + + @pytest.mark.parametrize("enable_partition_key", [True]) + @pytest.mark.parametrize("enable_inverted_index", [True]) + @pytest.mark.parametrize("tokenizer", ["standard", "jieba"]) + def test_query_phrase_match_with_different_tokenizer( + self, tokenizer, enable_inverted_index, enable_partition_key + ): + """ + target: Verify phrase match functionality with different tokenizers (standard, jieba) + method: 1. Generate test data using PhraseMatchTestGenerator with language-specific content + 2. Create collection with appropriate schema (primary key, text field with analyzer, vector field) + 3. Build both vector (IVF_SQ8) and inverted indexes + 4. Execute phrase match queries with various slop values + 5. Compare results against Tantivy reference implementation + expected: Milvus phrase match results should exactly match the reference implementation + results for all queries and slop values + note: Test is marked to xfail for jieba tokenizer due to known issues + """ + if tokenizer == "jieba": + pytest.xfail("Jieba tokenizer has known issues with phrase matching ") + + # Initialize parameters + dim = 128 + data_size = 3000 + num_queries = 10 + + # Initialize generator based on tokenizer + language = "zh" if tokenizer == "jieba" else "en" + generator = PhraseMatchTestGenerator(language=language) + + # Create collection + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), + schema=init_collection_schema(dim, tokenizer, enable_partition_key), + ) + + # Generate test data + test_data = generator.generate_test_data(data_size, dim) + df = pd.DataFrame(test_data) + log.info(f"Test data: \n{df['text']}") + # Insert data into collection + insert_data = [ + {"id": d["id"], "text": d["text"], "emb": d["emb"]} for d in test_data + ] + collection_w.insert(insert_data) + collection_w.flush() + + # Create indexes + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + if enable_inverted_index: + collection_w.create_index( + "text", {"index_type": "INVERTED", "params": {"tokenizer": tokenizer}} + ) + + collection_w.load() + + # Generate and execute test queries + test_queries = generator.generate_test_queries(num_queries) + + for query in test_queries: + expr = f"phrase_match(text, '{query['query']}', {query['slop']})" + log.info(f"Testing query: {expr}") + + # Execute query + results, _ = collection_w.query(expr=expr, output_fields=["id", "text"]) + + # Get expected matches using Tantivy + expected_matches = generator.get_query_results( + query["query"], query["slop"] + ) + # Get actual matches from Milvus + actual_matches = [r["id"] for r in results] + if set(actual_matches) != set(expected_matches): + log.info(f"collection schema: {collection_w.schema}") + for match_id in expected_matches: + # query by id to get text + res, _ = collection_w.query( + expr=f"id == {match_id}", output_fields=["text"] + ) + text = res[0]["text"] + log.info(f"Expected match: {match_id}, text: {text}") + + for match_id in actual_matches: + # query by id to get text + res, _ = collection_w.query( + expr=f"id == {match_id}", output_fields=["text"] + ) + text = res[0]["text"] + log.info(f"Matched document: {match_id}, text: {text}") + # Assert results match + assert ( + set(actual_matches) == set(expected_matches) + ), f"Mismatch in results for query '{query['query']}' with slop {query['slop']}" + + @pytest.mark.parametrize("enable_partition_key", [True]) + @pytest.mark.parametrize("enable_inverted_index", [True]) + @pytest.mark.parametrize("tokenizer", ["standard"]) + def test_phrase_match_as_filter_in_vector_search( + self, tokenizer, enable_inverted_index, enable_partition_key + ): + """ + target: Verify phrase match functionality when used as a filter in vector search + method: 1. Generate test data with both text content and vector embeddings + 2. Create collection with vector field (128d) and text field + 3. Build both vector index (IVF_SQ8) and text inverted index + 4. Perform vector search with phrase match as a filter condition + 5. Verify the combined search results maintain accuracy + expected: The system should correctly combine vector search with phrase match filtering + while maintaining both search accuracy and performance + """ + # Initialize parameters + dim = 128 + data_size = 3000 + num_queries = 10 + + # Initialize generator based on tokenizer + language = "zh" if tokenizer == "jieba" else "en" + generator = PhraseMatchTestGenerator(language=language) + + # Create collection + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), + schema=init_collection_schema(dim, tokenizer, enable_partition_key), + ) + + # Generate test data + test_data = generator.generate_test_data(data_size, dim) + df = pd.DataFrame(test_data) + log.info(f"Test data: \n{df['text']}") + # Insert data into collection + insert_data = [ + {"id": d["id"], "text": d["text"], "emb": d["emb"]} for d in test_data + ] + collection_w.insert(insert_data) + collection_w.flush() + + # Create indexes + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + if enable_inverted_index: + collection_w.create_index( + "text", {"index_type": "INVERTED", "params": {"tokenizer": tokenizer}} + ) + + collection_w.load() + + # Generate and execute test queries + test_queries = generator.generate_test_queries(num_queries) + + for query in test_queries: + expr = f"phrase_match(text, '{query['query']}', {query['slop']})" + log.info(f"Testing query: {expr}") + + # Execute filter search + data = [generator.generate_embedding(dim) for _ in range(10)] + results, _ = collection_w.search( + data, + anns_field="emb", + param={}, + limit=10, + expr=expr, + output_fields=["id", "text"], + ) + + # Get expected matches using Tantivy + expected_matches = generator.get_query_results( + query["query"], query["slop"] + ) + # assert results satisfy the filter + for hits in results: + for hit in hits: + assert hit.id in expected_matches + + @pytest.mark.parametrize("slop_value", [0, 1, 2, 5, 10]) + def test_slop_parameter(self, slop_value): + """ + target: Verify phrase matching behavior with varying slop values + method: 1. Create collection with standard tokenizer + 2. Generate and insert data with controlled word gaps between terms + 3. Test phrase matching with specific slop values (0, 1, 2, etc.) + 4. Verify matches at different word distances + 5. Compare results with Tantivy reference implementation + expected: Results should only match phrases where words are within the specified + slop distance, validating the slop parameter's distance control + """ + dim = 128 + data_size = 3000 + num_queries = 2 + tokenizer = "standard" + enable_partition_key = True + # Initialize generator based on tokenizer + language = "zh" if tokenizer == "jieba" else "en" + generator = PhraseMatchTestGenerator(language=language) + + # Create collection + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), + schema=init_collection_schema(dim, tokenizer, enable_partition_key), + ) + + # Generate test data + test_data = generator.generate_test_data(data_size, dim) + df = pd.DataFrame(test_data) + log.info(f"Test data: {df['text']}") + # Insert data into collection + insert_data = [ + {"id": d["id"], "text": d["text"], "emb": d["emb"]} for d in test_data + ] + collection_w.insert(insert_data) + collection_w.flush() + + # Create indexes + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + + collection_w.create_index("text", {"index_type": "INVERTED"}) + + collection_w.load() + + # Generate and execute test queries + test_queries = generator.generate_test_queries(num_queries) + + for query in test_queries: + expr = f"phrase_match(text, '{query['query']}', {slop_value})" + log.info(f"Testing query: {expr}") + + # Execute query + results, _ = collection_w.query(expr=expr, output_fields=["id", "text"]) + + # Get expected matches using Tantivy + expected_matches = generator.get_query_results(query["query"], slop_value) + # Get actual matches from Milvus + actual_matches = [r["id"] for r in results] + if set(actual_matches) != set(expected_matches): + log.info(f"collection schema: {collection_w.schema}") + for match_id in expected_matches: + # query by id to get text + res, _ = collection_w.query( + expr=f"id == {match_id}", output_fields=["text"] + ) + text = res[0]["text"] + log.info(f"Expected match: {match_id}, text: {text}") + + for match_id in actual_matches: + # query by id to get text + res, _ = collection_w.query( + expr=f"id == {match_id}", output_fields=["text"] + ) + text = res[0]["text"] + log.info(f"Matched document: {match_id}, text: {text}") + # Assert results match + assert ( + set(actual_matches) == set(expected_matches) + ), f"Mismatch in results for query '{query['query']}' with slop {slop_value}" + + def test_query_phrase_match_with_different_patterns(self): + """ + target: Verify phrase matching with various text patterns and complexities + method: 1. Create collection with standard tokenizer + 2. Generate and insert data with diverse phrase patterns: + - Exact phrases ("love swimming and running") + - Phrases with gaps ("enjoy very basketball") + - Complex phrases ("practice tennis seriously often") + - Multiple term phrases ("swimming running cycling") + 3. Test each pattern with appropriate slop values + 4. Verify minimum match count for each pattern + expected: System should correctly identify and match each pattern type + with the specified number of matches per pattern + """ + dim = 128 + collection_name = f"{prefix}_patterns" + schema = init_collection_schema(dim, "standard", False) + collection = self.init_collection_wrap(name=collection_name, schema=schema) + + # Generate data with various patterns + generator = PhraseMatchTestGenerator(language="en") + data = generator.generate_test_data(3000, dim) + collection.insert(data) + # Test various patterns + test_patterns = [ + ("love swimming and running", 0), # Exact phrase + ("enjoy very basketball", 1), # Phrase with gap + ("practice tennis seriously often", 2), # Complex phrase + ("swimming running cycling", 5), # Multiple activities + ] + + # Generate and insert documents that match the patterns + num_docs_per_pattern = 100 + pattern_documents = generator.generate_pattern_documents( + test_patterns, dim, num_docs_per_pattern=num_docs_per_pattern + ) + collection.insert(pattern_documents) + df = pd.DataFrame(pattern_documents)[["id", "text"]] + log.info(f"Test data:\n {df}") + + collection.create_index( + field_name="text", index_params={"index_type": "INVERTED"} + ) + collection.create_index( + field_name="emb", + index_params={ + "index_type": "IVF_SQ8", + "metric_type": "L2", + "params": {"nlist": 64}, + }, + ) + collection.load() + + for pattern, slop in test_patterns: + results, _ = collection.query( + expr=f'phrase_match(text, "{pattern}", {slop})', output_fields=["text"] + ) + log.info( + f"Pattern '{pattern}' with slop {slop} found {len(results)} matches" + ) + assert len(results) >= num_docs_per_pattern + + +@pytest.mark.tags(CaseLabel.L1) +class TestQueryPhraseMatchNegative(TestcaseBase): + def test_query_phrase_match_with_invalid_slop(self): + """ + target: Verify error handling for invalid slop values in phrase matching + method: 1. Create collection with standard test data + 2. Test phrase matching with invalid slop values: + - Negative slop values (-1) + - Extremely large slop values (10^31) + 3. Verify error handling and response + expected: System should: + 1. Reject queries with invalid slop values + 2. Return appropriate error responses + 3. Maintain system stability after invalid queries + """ + dim = 128 + collection_name = f"{prefix}_invalid_slop" + schema = init_collection_schema(dim, "standard", False) + collection = self.init_collection_wrap(name=collection_name, schema=schema) + + # Insert some test data + generator = PhraseMatchTestGenerator(language="en") + data = generator.generate_test_data(100, dim) + collection.insert(data) + + collection.create_index( + field_name="text", index_params={"index_type": "INVERTED"} + ) + collection.create_index( + field_name="emb", + index_params={ + "index_type": "IVF_SQ8", + "metric_type": "L2", + "params": {"nlist": 64}, + }, + ) + collection.load() + + # Test invalid inputs + invalid_cases = [ + ("valid query", -1), # Negative slop + ("valid query", 10 ** 31), # Very large slop + ] + + for query, slop in invalid_cases: + res, result = collection.query( + expr=f'phrase_match(text, "{query}", {slop})', + output_fields=["text"], + check_task=CheckTasks.check_nothing, + ) + log.info(f"Query: '{query[:10]}' with slop {slop} returned {res}") + assert result is False