test: [cherry-pick]add icu tokenizer testcases (#41630)

pr: https://github.com/milvus-io/milvus/pull/41501

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-04-30 14:18:54 +08:00 committed by GitHub
parent b69bf42a04
commit 3df2156ee2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 377 additions and 124 deletions

View File

@ -1,4 +1,3 @@
import pytest
import sys
from typing import Dict, List
from pymilvus import DefaultConfig
@ -18,7 +17,7 @@ from common import common_func as cf
from common import common_type as ct
from common.common_params import IndexPrams
from pymilvus import ResourceGroupInfo, DataType, utility
from pymilvus import ResourceGroupInfo, DataType, utility, MilvusClient
import pymilvus
@ -170,6 +169,22 @@ class TestcaseBase(Base):
log.info(f"server version: {server_version}")
return res
def get_tokens_by_analyzer(self, text, analyzer_params):
if cf.param_info.param_uri:
uri = cf.param_info.param_uri
else:
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
client = MilvusClient(
uri = uri,
token = cf.param_info.param_token
)
res = client.run_analyzer(text, analyzer_params, with_detail=True, with_hash=True)
tokens = [r['token'] for r in res.tokens]
return tokens
# def init_async_milvus_client(self):
# uri = cf.param_info.param_uri or f"http://{cf.param_info.param_host}:{cf.param_info.param_port}"
# kwargs = {

View File

@ -26,7 +26,7 @@ import jieba
import re
import inspect
from pymilvus import CollectionSchema, DataType, FunctionType, Function, MilvusException
from pymilvus import CollectionSchema, DataType, FunctionType, Function, MilvusException, MilvusClient
from bm25s.tokenization import Tokenizer
@ -266,6 +266,24 @@ def analyze_documents(texts, language="en"):
return word_freq
def analyze_documents_with_analyzer_params(texts, analyzer_params):
if param_info.param_uri:
uri = param_info.param_uri
else:
uri = "http://" + param_info.param_host + ":" + str(param_info.param_port)
client = MilvusClient(
uri = uri,
token = param_info.param_token
)
freq = Counter()
res = client.run_analyzer(texts, analyzer_params, with_detail=True, with_hash=True)
for r in res:
freq.update(t['token'] for t in r.tokens)
log.info(f"word freq {freq.most_common(10)}")
return freq
def check_token_overlap(text_a, text_b, language="en"):
word_freq_a = analyze_documents([text_a], language)
word_freq_b = analyze_documents([text_b], language)
@ -2446,7 +2464,7 @@ def gen_json_field_expressions_all_single_operator():
"json_field is null", "json_field IS NULL", "json_field is not null", "json_field IS NOT NULL",
"json_field['a'] is null", "json_field['a'] IS NULL", "json_field['a'] is not null", "json_field['a'] IS NOT NULL"
]
return expressions

View File

@ -369,111 +369,3 @@ class PhraseMatchTestGenerator:
return matched_docs
class KoreanTextGenerator:
def __init__(self):
# Sports/Activities (Nouns)
self.activities = [
"수영", "축구", "농구", "테니스",
"배구", "야구", "골프", "럭비",
"달리기", "자전거", "스케이트", "스키",
"서핑", "다이빙", "등산", "요가",
"", "하이킹", "독서", "요리"
]
# Verbs (Base Form)
self.verbs = [
"좋아하다", "즐기다", "하다", "배우다",
"가르치다", "보다", "시작하다", "계속하다",
"연습하다", "선호하다", "마스터하다", "도전하다"
]
# Connectors
self.connectors = [
"그리고", "또는", "하지만", "그런데",
"그래서", "또한", "게다가", "그러면서",
"동시에", "함께"
]
# Modifiers (Frequency/Degree)
self.modifiers = [
"매우", "자주", "가끔", "열심히",
"전문적으로", "규칙적으로", "매일", "일주일에 한 번",
"취미로", "진지하게"
]
def conjugate_verb(self, verb):
# Simple Korean verb conjugation (using informal style "-아/어요")
if verb.endswith("하다"):
return verb.replace("하다", "해요")
elif verb.endswith(""):
return verb[:-1] + "아요"
return verb
def sentence(self):
# Build basic sentence structure
activity = random.choice(self.activities)
verb = random.choice(self.verbs)
modifier = random.choice(self.modifiers)
# Conjugate verb
conjugated_verb = self.conjugate_verb(verb)
# Build sentence (Korean word order: Subject + Object + Modifier + Verb)
sentence = f"저는 {activity}를/을 {modifier} {conjugated_verb}"
# Randomly add connector and another activity
if random.choice([True, False]):
connector = random.choice(self.connectors)
second_activity = random.choice(self.activities)
second_verb = self.conjugate_verb(random.choice(self.verbs))
sentence += f" {connector} {second_activity}{second_verb}"
return sentence + "."
def paragraph(self, num_sentences=3):
return '\n'.join([self.sentence() for _ in range(num_sentences)])
def text(self, num_sentences=5):
return '\n'.join([self.sentence() for _ in range(num_sentences)])
def generate_text_by_analyzer(analyzer_params):
"""
Generate text data based on the given analyzer parameters
Args:
analyzer_params: Dictionary containing the analyzer parameters
Returns:
str: Generated text data
"""
if analyzer_params["tokenizer"] == "standard":
fake = Faker("en_US")
elif analyzer_params["tokenizer"] == "jieba":
fake = Faker("zh_CN")
elif analyzer_params["tokenizer"]["type"] == "lindera":
# Generate random Japanese text
if analyzer_params["tokenizer"]["dict_kind"] == "ipadic":
fake = Faker("ja_JP")
elif analyzer_params["tokenizer"]["dict_kind"] == "ko-dic":
fake = KoreanTextGenerator()
elif analyzer_params["tokenizer"]["dict_kind"] == "cc-cedict":
fake = Faker("zh_CN")
else:
raise ValueError("Invalid dict_kind")
else:
raise ValueError("Invalid analyzer parameters")
text = fake.text()
stop_words = []
if "filter" in analyzer_params:
for filter in analyzer_params["filter"]:
if filter["type"] == "stop":
stop_words.extend(filter["stop_words"])
# add stop words to the text
text += " " + " ".join(stop_words)
return text

View File

@ -0,0 +1,179 @@
from faker import Faker
import random
class ICUTextGenerator:
"""
ICU(International Components for Unicode)TextGenerator:
Generate test sentences containing multiple languages (Chinese, English, Japanese, Korean), emojis, and special symbols.
"""
def __init__(self):
self.fake_en = Faker("en_US")
self.fake_zh = Faker("zh_CN")
self.fake_ja = Faker("ja_JP")
self.fake_de = Faker("de_DE")
self.korean_samples = [
"안녕하세요 세계", "파이썬 프로그래밍", "데이터 분석", "인공지능",
"밀버스 테스트", "한국어 샘플", "자연어 처리"
]
self.emojis = ["😊", "🐍", "🚀", "🌏", "💡", "🔥", "", "👍"]
self.specials = ["#", "@", "$"]
def word(self):
"""
Generate a list of words containing multiple languages, emojis, and special symbols.
"""
parts = [
self.fake_en.word(),
self.fake_zh.word(),
self.fake_ja.word(),
self.fake_de.word(),
random.choice(self.korean_samples),
random.choice(self.emojis),
random.choice(self.specials),
]
return random.choice(parts)
def sentence(self):
"""
Generate a sentence containing multiple languages, emojis, and special symbols.
"""
parts = [
self.fake_en.sentence(),
self.fake_zh.sentence(),
self.fake_ja.sentence(),
self.fake_de.sentence(),
random.choice(self.korean_samples),
" ".join(random.sample(self.emojis, 2)),
" ".join(random.sample(self.specials, 2)),
]
random.shuffle(parts)
return " ".join(parts)
def paragraph(self, num_sentences=3):
"""
Generate a paragraph containing multiple sentences, each with multiple languages, emojis, and special symbols.
"""
return ' '.join([self.sentence() for _ in range(num_sentences)])
def text(self, num_sentences=5):
"""
Generate multiple sentences containing multiple languages, emojis, and special symbols.
"""
return ' '.join([self.sentence() for _ in range(num_sentences)])
class KoreanTextGenerator:
"""
KoreanTextGenerator: Generate test sentences containing Korean activities, verbs, connectors, and modifiers.
"""
def __init__(self):
# Sports/Activities (Nouns)
self.activities = [
"수영", "축구", "농구", "테니스",
"배구", "야구", "골프", "럭비",
"달리기", "자전거", "스케이트", "스키",
"서핑", "다이빙", "등산", "요가",
"", "하이킹", "독서", "요리"
]
# Verbs (Base Form)
self.verbs = [
"좋아하다", "즐기다", "하다", "배우다",
"가르치다", "보다", "시작하다", "계속하다",
"연습하다", "선호하다", "마스터하다", "도전하다"
]
# Connectors
self.connectors = [
"그리고", "또는", "하지만", "그런데",
"그래서", "또한", "게다가", "그러면서",
"동시에", "함께"
]
# Modifiers (Frequency/Degree)
self.modifiers = [
"매우", "자주", "가끔", "열심히",
"전문적으로", "규칙적으로", "매일", "일주일에 한 번",
"취미로", "진지하게"
]
def conjugate_verb(self, verb):
# Simple Korean verb conjugation (using informal style "-아/어요")
if verb.endswith("하다"):
return verb.replace("하다", "해요")
elif verb.endswith(""):
return verb[:-1] + "아요"
return verb
def word(self):
return random.choice(self.activities + self.verbs + self.modifiers + self.connectors)
def sentence(self):
# Build basic sentence structure
activity = random.choice(self.activities)
verb = random.choice(self.verbs)
modifier = random.choice(self.modifiers)
# Conjugate verb
conjugated_verb = self.conjugate_verb(verb)
# Build sentence (Korean word order: Subject + Object + Modifier + Verb)
sentence = f"저는 {activity}를/을 {modifier} {conjugated_verb}"
# Randomly add connector and another activity
if random.choice([True, False]):
connector = random.choice(self.connectors)
second_activity = random.choice(self.activities)
second_verb = self.conjugate_verb(random.choice(self.verbs))
sentence += f" {connector} {second_activity}{second_verb}"
return sentence + "."
def paragraph(self, num_sentences=3):
return '\n'.join([self.sentence() for _ in range(num_sentences)])
def text(self, num_sentences=5):
return '\n'.join([self.sentence() for _ in range(num_sentences)])
def generate_text_by_analyzer(analyzer_params):
"""
Generate text data based on the given analyzer parameters
Args:
analyzer_params: Dictionary containing the analyzer parameters
Returns:
str: Generated text data
"""
if analyzer_params["tokenizer"] == "standard":
fake = Faker("en_US")
elif analyzer_params["tokenizer"] == "jieba":
fake = Faker("zh_CN")
elif analyzer_params["tokenizer"] == "icu":
fake = ICUTextGenerator()
elif analyzer_params["tokenizer"]["type"] == "lindera":
# Generate random Japanese text
if analyzer_params["tokenizer"]["dict_kind"] == "ipadic":
fake = Faker("ja_JP")
elif analyzer_params["tokenizer"]["dict_kind"] == "ko-dic":
fake = KoreanTextGenerator()
elif analyzer_params["tokenizer"]["dict_kind"] == "cc-cedict":
fake = Faker("zh_CN")
else:
raise ValueError("Invalid dict_kind")
else:
raise ValueError("Invalid analyzer parameters")
text = fake.text()
stop_words = []
if "filter" in analyzer_params:
for filter in analyzer_params["filter"]:
if filter["type"] == "stop":
stop_words.extend(filter["stop_words"])
# add stop words to the text
text += " " + " ".join(stop_words)
return text

View File

@ -2,7 +2,7 @@ import pytest
from base.client_v2_base import TestMilvusClientV2Base
from common.common_type import CaseLabel
from common.phrase_match_generator import generate_text_by_analyzer
from common.text_generator import generate_text_by_analyzer
class TestMilvusClientAnalyzer(TestMilvusClientV2Base):
@ -19,6 +19,9 @@ class TestMilvusClientAnalyzer(TestMilvusClientV2Base):
{
"tokenizer": "jieba",
},
{
"tokenizer": "icu"
}
# {
# "tokenizer": {"type": "lindera", "dict_kind": "ipadic"},
# "filter": [
@ -41,8 +44,8 @@ class TestMilvusClientAnalyzer(TestMilvusClientV2Base):
"""
client = self._client()
text = generate_text_by_analyzer(analyzer_params)
res, result = self.run_analyzer(client, text, analyzer_params, with_detail=True, with_hash=True)
res_2, result_2 = self.run_analyzer(client, text, analyzer_params, with_detail=True, with_hash=True)
res, _ = self.run_analyzer(client, text, analyzer_params, with_detail=True, with_hash=True)
res_2, _ = self.run_analyzer(client, text, analyzer_params, with_detail=True, with_hash=True)
# verify the result are the same when run analyzer twice
for i in range(len(res.tokens)):
assert res.tokens[i]["token"] == res_2.tokens[i]["token"]

View File

@ -1,11 +1,9 @@
import jieba
import utils.util_pymilvus as ut
from utils.util_log import test_log as log
from common.common_type import CaseLabel, CheckTasks
from common import common_type as ct
from common import common_func as cf
from common.phrase_match_generator import KoreanTextGenerator
from common.text_generator import KoreanTextGenerator, ICUTextGenerator
from common.code_mapping import ConnectionErrorMessage as cem
from base.client_base import TestcaseBase
from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_EVENTUALLY
@ -17,7 +15,6 @@ from pymilvus import (
import threading
from pymilvus import DefaultConfig
import time
import pytest
import random
import numpy as np
@ -4734,7 +4731,10 @@ class TestQueryTextMatch(TestcaseBase):
wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language)
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
expr = f"text_match({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
@ -4879,7 +4879,10 @@ class TestQueryTextMatch(TestcaseBase):
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
expr = f"text_match({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
@ -4912,6 +4915,140 @@ class TestQueryTextMatch(TestcaseBase):
assert any(
[token in r[field] for token in top_10_tokens]), f"top 10 tokens {top_10_tokens} not in {r[field]}"
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("enable_partition_key", [True, False])
@pytest.mark.parametrize("enable_inverted_index", [True, False])
@pytest.mark.parametrize("tokenizer", ["icu"])
def test_query_text_match_with_icu_tokenizer(
self, tokenizer, enable_inverted_index, enable_partition_key
):
"""
target: test text match with icu tokenizer
method: 1. enable text match and insert data with varchar
2. get the most common words and query with text match
3. verify the result
expected: text match successfully and result is correct
"""
analyzer_params = {
"tokenizer": tokenizer,
}
dim = 128
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(
name="word",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
is_partition_key=enable_partition_key,
analyzer_params=analyzer_params,
),
FieldSchema(
name="sentence",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="paragraph",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
analyzer_params=analyzer_params,
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=65535,
enable_analyzer=True,
enable_match=True,
analyzer_params=analyzer_params,
),
FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim),
]
schema = CollectionSchema(fields=fields, description="test collection")
data_size = 3000
collection_w = self.init_collection_wrap(
name=cf.gen_unique_str(prefix), schema=schema
)
fake = ICUTextGenerator()
data = [
{
"id": i,
"word": fake.word().lower(),
"sentence": fake.sentence().lower(),
"paragraph": fake.paragraph().lower(),
"text": fake.text().lower(),
"emb": [random.random() for _ in range(dim)],
}
for i in range(data_size)
]
df = pd.DataFrame(data)
log.info(f"dataframe\n{df}")
batch_size = 5000
for i in range(0, len(df), batch_size):
collection_w.insert(
data[i: i + batch_size]
if i + batch_size < len(df)
else data[i: len(df)]
)
# only if the collection is flushed, the inverted index ca be applied.
# growing segment may be not applied, although in strong consistency.
collection_w.flush()
collection_w.create_index(
"emb",
{"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}},
)
if enable_inverted_index:
collection_w.create_index("word", {"index_type": "INVERTED"})
collection_w.load()
# analyze the croup
text_fields = ["word", "sentence", "paragraph", "text"]
wf_map = {}
for field in text_fields:
wf_map[field] = cf.analyze_documents_with_analyzer_params(df[field].tolist(), analyzer_params)
# query single field for one token
for field in text_fields:
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
expr = f"text_match({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
assert len(res) > 0
log.info(f"res len {len(res)}")
for r in res:
assert token in r[field]
# verify inverted index
if enable_inverted_index:
if field == "word":
expr = f"{field} == '{token}'"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert r[field] == token
# query single field for multi-word
for field in text_fields:
# match top 10 most common words
top_10_tokens = []
for word, count in wf_map[field].most_common(10):
top_10_tokens.append(word)
string_of_top_10_words = " ".join(top_10_tokens)
expr = f"text_match({field}, '{string_of_top_10_words}')"
log.info(f"expr {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
log.info(f"res len {len(res)}")
for r in res:
assert any([token in r[field] for token in top_10_tokens])
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("enable_partition_key", [True])
@pytest.mark.parametrize("enable_inverted_index", [True])
@ -5014,7 +5151,10 @@ class TestQueryTextMatch(TestcaseBase):
wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language)
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
expr = f"text_match({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
@ -5153,7 +5293,10 @@ class TestQueryTextMatch(TestcaseBase):
wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language)
# query single field for one token
for field in text_fields:
token = wf_map[field].most_common()[0][0]
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
expr = f"text_match({field}, '{token}')"
log.info(f"expr: {expr}")
res, _ = collection_w.query(expr=expr, output_fields=["id", field])
@ -6532,7 +6675,10 @@ class TestQueryTextMatch(TestcaseBase):
wf_map[field] = cf.analyze_documents(df[field].tolist(), language="en")
# query single field for one word
for field in text_fields:
token = wf_map[field].most_common()[0][0]
most_common_tokens = wf_map[field].most_common(10)
mid = len(most_common_tokens) // 2
idx = random.randint(0, max(0, mid - 1))
token = most_common_tokens[idx][0]
tm_expr = f"text_match({field}, '{token}')"
int_expr = "age > 10"
combined_expr = f"{tm_expr} {combine_op} {int_expr}"