enhance: support set lindera dict build dir and download url in yaml (#43541)

relate: https://github.com/milvus-io/milvus/issues/43120

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2025-08-04 09:47:38 +08:00 committed by GitHub
parent e305a3fa35
commit 4f02b06abc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 405 additions and 136 deletions

View File

@ -1409,3 +1409,5 @@ function:
enable: true # Whether to enable TEI rerank service enable: true # Whether to enable TEI rerank service
vllm: vllm:
enable: true # Whether to enable vllm rerank service enable: true # Whether to enable vllm rerank service
analyzer:
local_resource_path: /var/lib/milvus/analyzer

View File

@ -932,6 +932,16 @@ dependencies = [
"num", "num",
] ]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "fs4" name = "fs4"
version = "0.6.6" version = "0.6.6"
@ -3853,6 +3863,7 @@ dependencies = [
"env_logger", "env_logger",
"fancy-regex", "fancy-regex",
"flate2", "flate2",
"fs2",
"futures", "futures",
"icu_segmenter", "icu_segmenter",
"jieba-rs", "jieba-rs",

View File

@ -26,6 +26,7 @@ overflow-checks = false
tantivy = { git = "https://github.com/zilliztech/tantivy.git" } tantivy = { git = "https://github.com/zilliztech/tantivy.git" }
tantivy-5 = { package = "tantivy", git = "https://github.com/zilliztech/tantivy", tag = "0.21.1-fix4" } tantivy-5 = { package = "tantivy", git = "https://github.com/zilliztech/tantivy", tag = "0.21.1-fix4" }
lindera = "0.42.4" lindera = "0.42.4"
fs2 = "0.4"
lindera-dictionary = "0.42.4" lindera-dictionary = "0.42.4"
futures = "0.3.21" futures = "0.3.21"
libc = "0.2" libc = "0.2"

View File

@ -78,9 +78,9 @@ impl AnalyzerBuilder<'_> {
for filter in filters { for filter in filters {
if filter.is_string() { if filter.is_string() {
let filter_name = filter.as_str().unwrap(); let filter_name = filter.as_str().unwrap();
let costum = self.filters.remove(filter_name); let customize = self.filters.remove(filter_name);
if !costum.is_none() { if !customize.is_none() {
builder = costum.unwrap().transform(builder); builder = customize.unwrap().transform(builder);
continue; continue;
} }

View File

@ -1,16 +1,13 @@
use super::fetch; use super::fetch;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use lindera_dictionary::dictionary_builder::cc_cedict::CcCedictBuilder; use lindera_dictionary::dictionary_builder::cc_cedict::CcCedictBuilder;
use tokio::runtime::Runtime;
#[cfg(feature = "lindera-cc-cedict")] #[cfg(feature = "lindera-cc-cedict")]
use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind}; use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind};
#[cfg(not(feature = "lindera-cc-cedict"))] #[cfg(not(feature = "lindera-cc-cedict"))]
async fn download(params: &fetch::FetchParams) -> Result<()> { fn download(params: &fetch::FetchParams) -> Result<()> {
fetch::fetch(params, CcCedictBuilder::new()) fetch::fetch(params, CcCedictBuilder::new()).map_err(|e| {
.await
.map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"fetch cc_cedict failed with error: {}", "fetch cc_cedict failed with error: {}",
e.to_string() e.to_string()
@ -39,8 +36,7 @@ pub fn load_cc_cedict(
params.download_urls = download_url params.download_urls = download_url
} }
let rt = Runtime::new().unwrap(); download(&params)?;
rt.block_on(download(&params))?;
fetch::load(&params) fetch::load(&params)
} }

View File

@ -1,28 +1,59 @@
use std::env;
use std::error::Error; use std::error::Error;
use std::path::Path; use std::fs;
use std::io::{self, Cursor, Read, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;
use flate2::read::GzDecoder;
use fs2::FileExt;
use tar::Archive;
use lindera::dictionary::Dictionary; use lindera::dictionary::Dictionary;
use lindera_dictionary::dictionary::character_definition::CharacterDefinition; use lindera_dictionary::dictionary::character_definition::CharacterDefinition;
use lindera_dictionary::dictionary::connection_cost_matrix::ConnectionCostMatrix; use lindera_dictionary::dictionary::connection_cost_matrix::ConnectionCostMatrix;
use lindera_dictionary::dictionary::prefix_dictionary::PrefixDictionary; use lindera_dictionary::dictionary::prefix_dictionary::PrefixDictionary;
use lindera_dictionary::dictionary::unknown_dictionary::UnknownDictionary; use lindera_dictionary::dictionary::unknown_dictionary::UnknownDictionary;
use lindera_dictionary::dictionary_builder::DictionaryBuilder;
use log::{error, info, warn}; use log::{error, info, warn};
use md5::Context; use md5::Context;
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use reqwest::Client; use reqwest::Client;
use tokio::runtime::Runtime;
use tokio::time::sleep; use tokio::time::sleep;
use tokio::time::Duration; use tokio::time::Duration;
use serde_json as json;
use std::fs;
use std::path::PathBuf;
use super::common; use super::common;
use crate::error::TantivyBindingError; use crate::error::TantivyBindingError;
use lindera_dictionary::dictionary_builder::DictionaryBuilder;
const MAX_ROUND: usize = 3; const MAX_ROUND: usize = 3;
pub struct FileMutexGuard {
file: Option<fs::File>,
path: PathBuf,
}
impl FileMutexGuard {
fn build(path: PathBuf) -> io::Result<FileMutexGuard> {
let flock = fs::File::create(&path)?;
flock.lock_exclusive()?;
Ok(FileMutexGuard {
file: Some(flock),
path: path,
})
}
}
impl Drop for FileMutexGuard {
fn drop(&mut self) {
if let Some(file) = self.file.take() {
let _ = file.unlock();
drop(file); // drop file before remove file
let _ = std::fs::remove_file(&self.path);
}
}
}
pub struct FetchParams { pub struct FetchParams {
pub lindera_dir: String, pub lindera_dir: String,
@ -138,35 +169,20 @@ async fn download_with_retry(
Err("Failed to download a valid file from all sources".into()) Err("Failed to download a valid file from all sources".into())
} }
/// Fetch the necessary assets and then build the dictionary using `builder` pub fn build(
pub async fn fetch(
params: &FetchParams, params: &FetchParams,
builder: impl DictionaryBuilder, builder: impl DictionaryBuilder,
build_dir: &PathBuf,
input_dir: &PathBuf,
output_dir: &PathBuf,
tmp_dir: &PathBuf,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
use std::env;
use std::fs::{rename, File};
use std::io::{self, Cursor, Read, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;
use flate2::read::GzDecoder;
use tar::Archive;
let start = Instant::now(); let start = Instant::now();
info!( info!(
"start fetch lindera dictionary name: {}\n", "start donwload and build lindera dictionary. name: {} to {:?}\n",
params.file_name.as_str() params.file_name.as_str(),
output_dir,
); );
let build_dir = PathBuf::from(params.lindera_dir.as_str());
std::fs::create_dir_all(&build_dir)?;
let input_dir = build_dir.join(params.input_dir.as_str());
let output_dir = build_dir.join(params.output_dir.as_str());
// Fast path where the data is already in cache
if output_dir.is_dir() {
return Ok(());
}
// Source file path for build package // Source file path for build package
let source_path_for_build = &build_dir.join(params.file_name.as_str()); let source_path_for_build = &build_dir.join(params.file_name.as_str());
@ -179,33 +195,32 @@ pub async fn fetch(
.user_agent(format!("Lindera/{}", env!("CARGO_PKG_VERSION"))) .user_agent(format!("Lindera/{}", env!("CARGO_PKG_VERSION")))
.build()?; .build()?;
let mut dest = File::create(tmp_path.as_path())?; let mut dest = fs::File::create(tmp_path.as_path())?;
let content = download_with_retry(
let rt = Runtime::new().unwrap();
let content = rt.block_on(download_with_retry(
&client, &client,
params.download_urls.iter().map(|s| s.as_str()).collect(), params.download_urls.iter().map(|s| s.as_str()).collect(),
MAX_ROUND, MAX_ROUND,
params.md5_hash.as_str(), params.md5_hash.as_str(),
) ))?;
.await?;
io::copy(&mut Cursor::new(content.as_slice()), &mut dest)?; io::copy(&mut Cursor::new(content.as_slice()), &mut dest)?;
dest.flush()?; dest.flush()?;
rename(tmp_path.clone(), source_path_for_build)?; fs::rename(tmp_path.clone(), source_path_for_build)?;
// Decompress a tar.gz file let tmp_extracted_path = tmp_dir.join(params.input_dir.as_str());
let tmp_extract_path = Path::new(&build_dir).join(format!("tmp-archive-{}", params.input_dir)); let _ = std::fs::remove_dir_all(&tmp_dir);
let tmp_extracted_path = tmp_extract_path.join(params.input_dir.as_str()); std::fs::create_dir_all(&tmp_dir)?;
let _ = std::fs::remove_dir_all(&tmp_extract_path);
std::fs::create_dir_all(&tmp_extract_path)?;
let mut tar_gz = File::open(source_path_for_build)?; let mut tar_gz = fs::File::open(source_path_for_build)?;
let mut buffer = Vec::new(); let mut buffer = Vec::new();
tar_gz.read_to_end(&mut buffer)?; tar_gz.read_to_end(&mut buffer)?;
let cursor = Cursor::new(buffer); let cursor = Cursor::new(buffer);
let decoder = GzDecoder::new(cursor); let decoder = GzDecoder::new(cursor);
let mut archive = Archive::new(decoder); let mut archive = Archive::new(decoder);
archive.unpack(&tmp_extract_path)?; archive.unpack(&tmp_dir)?;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
{ {
@ -229,10 +244,9 @@ pub async fn fetch(
{ {
// Empty the input directory first to avoid conflicts when renaming the directory later on Linux and macOS systems (which do not support overwriting directories). // Empty the input directory first to avoid conflicts when renaming the directory later on Linux and macOS systems (which do not support overwriting directories).
empty_directory(&input_dir)?; empty_directory(&input_dir)?;
rename(tmp_extracted_path, &input_dir)?; fs::rename(tmp_extracted_path, &input_dir)?;
} }
let _ = std::fs::remove_dir_all(&tmp_extract_path);
drop(dest); drop(dest);
let _ = std::fs::remove_file(source_path_for_build); let _ = std::fs::remove_file(source_path_for_build);
@ -265,19 +279,54 @@ pub async fn fetch(
empty_directory(&output_dir)?; empty_directory(&output_dir)?;
// Rename tmp_path to output_dir // Rename tmp_path to output_dir
rename(tmp_path, &output_dir)?; fs::rename(tmp_path, &output_dir)?;
} }
let _ = std::fs::remove_dir_all(&input_dir);
info!( info!(
"finish fetch lindera dictionary name: {} duration: {} ms\n", "finish donwload and build lindera dictionary. name: {} duration: {} ms\n",
params.file_name.as_str(), params.file_name.as_str(),
start.elapsed().as_millis() start.elapsed().as_millis()
); );
Ok(()) Ok(())
} }
/// Fetch the necessary assets and then build the dictionary using `builder`
pub fn fetch(params: &FetchParams, builder: impl DictionaryBuilder) -> Result<(), Box<dyn Error>> {
let build_dir = PathBuf::from(params.lindera_dir.as_str());
std::fs::create_dir_all(&build_dir)?;
let input_dir = build_dir.join(params.input_dir.as_str());
let output_dir = build_dir.join(params.output_dir.as_str());
let lock_path = build_dir.join(format!("lindera-{}.lock", params.file_name.as_str()));
// Skip create fs lock if already in cache
if output_dir.is_dir() {
return Ok(());
}
let _flock_guard = FileMutexGuard::build(lock_path)?;
// Fast path where the data is already in cache
if output_dir.is_dir() {
return Ok(());
}
// Decompress a tar.gz file
let tmp_dir = Path::new(&build_dir).join(format!("tmp-archive-{}", params.input_dir));
let build_result = build(
params,
builder,
&build_dir,
&input_dir,
&output_dir,
&tmp_dir,
);
let _ = std::fs::remove_dir_all(&tmp_dir);
let _ = std::fs::remove_dir_all(&input_dir);
build_result
}
pub fn load(params: &FetchParams) -> Result<lindera::dictionary::Dictionary, TantivyBindingError> { pub fn load(params: &FetchParams) -> Result<lindera::dictionary::Dictionary, TantivyBindingError> {
let dict_dir = PathBuf::from(params.lindera_dir.clone()).join(params.output_dir.clone()); let dict_dir = PathBuf::from(params.lindera_dir.clone()).join(params.output_dir.clone());
let da_data = fs::read(dict_dir.join(common::DA_DATA))?; let da_data = fs::read(dict_dir.join(common::DA_DATA))?;
@ -286,7 +335,7 @@ pub fn load(params: &FetchParams) -> Result<lindera::dictionary::Dictionary, Tan
let words_data = fs::read(dict_dir.join(common::WORDS_DATA))?; let words_data = fs::read(dict_dir.join(common::WORDS_DATA))?;
let connection_data = fs::read(dict_dir.join(common::CONNECTION_DATA))?; let connection_data = fs::read(dict_dir.join(common::CONNECTION_DATA))?;
let char_definition_data = fs::read(dict_dir.join(common::CHAR_DEFINITION_DATA))?; let char_definition_data = fs::read(dict_dir.join(common::CHAR_DEFINITION_DATA))?;
let unkonwn_data = fs::read(dict_dir.join(common::UNKNOWN_DATA))?; let unknown_data = fs::read(dict_dir.join(common::UNKNOWN_DATA))?;
let dict = Dictionary { let dict = Dictionary {
prefix_dictionary: PrefixDictionary::load( prefix_dictionary: PrefixDictionary::load(
@ -305,7 +354,7 @@ pub fn load(params: &FetchParams) -> Result<lindera::dictionary::Dictionary, Tan
)) ))
}, },
)?, )?,
unknown_dictionary: UnknownDictionary::load(unkonwn_data.as_slice()).map_err(|e| { unknown_dictionary: UnknownDictionary::load(unknown_data.as_slice()).map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"lindera load unknown dictionary failed, err:{}", "lindera load unknown dictionary failed, err:{}",
e e

View File

@ -1,16 +1,13 @@
use super::fetch; use super::fetch;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use lindera_dictionary::dictionary_builder::ipadic::IpadicBuilder; use lindera_dictionary::dictionary_builder::ipadic::IpadicBuilder;
use tokio::runtime::Runtime;
#[cfg(feature = "lindera-ipadic")] #[cfg(feature = "lindera-ipadic")]
use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind}; use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind};
#[cfg(not(feature = "lindera-ipadic"))] #[cfg(not(feature = "lindera-ipadic"))]
async fn download(params: &fetch::FetchParams) -> Result<()> { fn download(params: &fetch::FetchParams) -> Result<()> {
fetch::fetch(params, IpadicBuilder::new()) fetch::fetch(params, IpadicBuilder::new()).map_err(|e| {
.await
.map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"fetch ipadic failed with error: {}", "fetch ipadic failed with error: {}",
e.to_string() e.to_string()
@ -40,8 +37,7 @@ pub fn load_ipadic(
params.download_urls = download_url params.download_urls = download_url
} }
let rt = Runtime::new().unwrap(); download(&params)?;
rt.block_on(download(&params))?;
fetch::load(&params) fetch::load(&params)
} }

View File

@ -1,16 +1,12 @@
use super::fetch; use super::fetch;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use lindera_dictionary::dictionary_builder::ipadic_neologd::IpadicNeologdBuilder;
use tokio::runtime::Runtime;
#[cfg(feature = "lindera-ipadic-neologd")] #[cfg(feature = "lindera-ipadic-neologd")]
use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind}; use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind};
use lindera_dictionary::dictionary_builder::ipadic_neologd::IpadicNeologdBuilder;
#[cfg(not(feature = "lindera-ipadic-neologd"))] #[cfg(not(feature = "lindera-ipadic-neologd"))]
async fn download(params: &fetch::FetchParams) -> Result<()> { fn download(params: &fetch::FetchParams) -> Result<()> {
fetch::fetch(params, IpadicNeologdBuilder::new()) fetch::fetch(params, IpadicNeologdBuilder::new()).map_err(|e| {
.await
.map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"fetch ipadic-neologd failed with error: {}", "fetch ipadic-neologd failed with error: {}",
e.to_string() e.to_string()
@ -39,8 +35,7 @@ pub fn load_ipadic_neologd(
params.download_urls = download_url params.download_urls = download_url
} }
let rt = Runtime::new().unwrap(); download(&params)?;
rt.block_on(download(&params))?;
fetch::load(&params) fetch::load(&params)
} }

View File

@ -1,16 +1,13 @@
use super::fetch; use super::fetch;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use lindera_dictionary::dictionary_builder::ko_dic::KoDicBuilder; use lindera_dictionary::dictionary_builder::ko_dic::KoDicBuilder;
use tokio::runtime::Runtime;
#[cfg(feature = "lindera-ko-dic")] #[cfg(feature = "lindera-ko-dic")]
use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind}; use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind};
#[cfg(not(feature = "lindera-ko-dic"))] #[cfg(not(feature = "lindera-ko-dic"))]
async fn download(params: &fetch::FetchParams) -> Result<()> { fn download(params: &fetch::FetchParams) -> Result<()> {
fetch::fetch(params, KoDicBuilder::new()) fetch::fetch(params, KoDicBuilder::new()).map_err(|e| {
.await
.map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"fetch ko-dic failed with error: {}", "fetch ko-dic failed with error: {}",
e.to_string() e.to_string()
@ -40,8 +37,7 @@ pub fn load_ko_dic(
params.download_urls = download_url params.download_urls = download_url
} }
let rt = Runtime::new().unwrap(); download(&params)?;
rt.block_on(download(&params))?;
fetch::load(&params) fetch::load(&params)
} }

View File

@ -1,16 +1,13 @@
use super::fetch; use super::fetch;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use lindera_dictionary::dictionary_builder::unidic::UnidicBuilder; use lindera_dictionary::dictionary_builder::unidic::UnidicBuilder;
use tokio::runtime::Runtime;
#[cfg(feature = "lindera-unidic")] #[cfg(feature = "lindera-unidic")]
use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind}; use lindera::dictionary::{load_dictionary_from_kind, DictionaryKind};
#[cfg(not(feature = "lindera-unidic"))] #[cfg(not(feature = "lindera-unidic"))]
async fn download(params: &fetch::FetchParams) -> Result<()> { fn download(params: &fetch::FetchParams) -> Result<()> {
fetch::fetch(params, UnidicBuilder::new()) fetch::fetch(params, UnidicBuilder::new()).map_err(|e| {
.await
.map_err(|e| {
TantivyBindingError::InternalError(format!( TantivyBindingError::InternalError(format!(
"fetch unidic failed with error: {}", "fetch unidic failed with error: {}",
e.to_string() e.to_string()
@ -39,8 +36,7 @@ pub fn load_unidic(
params.download_urls = download_url params.download_urls = download_url
} }
let rt = Runtime::new().unwrap(); download(&params)?;
rt.block_on(download(&params))?;
fetch::load(&params) fetch::load(&params)
} }

View File

@ -3,7 +3,7 @@ use serde_json as json;
use super::stop_words; use super::stop_words;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
pub(crate) fn get_string_list(value: &json::Value, label: &str) -> Result<Vec<String>> { pub fn get_string_list(value: &json::Value, label: &str) -> Result<Vec<String>> {
if !value.is_array() { if !value.is_array() {
return Err(TantivyBindingError::InternalError( return Err(TantivyBindingError::InternalError(
format!("{} should be array", label).to_string(), format!("{} should be array", label).to_string(),

View File

@ -16,6 +16,7 @@ use lindera::token_filter::korean_stop_tags::KoreanStopTagsTokenFilter;
use lindera::token_filter::BoxTokenFilter as LTokenFilter; use lindera::token_filter::BoxTokenFilter as LTokenFilter;
use crate::analyzer::dict::lindera::load_dictionary_from_kind; use crate::analyzer::dict::lindera::load_dictionary_from_kind;
use crate::analyzer::filter::get_string_list;
use crate::error::{Result, TantivyBindingError}; use crate::error::{Result, TantivyBindingError};
use serde_json as json; use serde_json as json;
@ -25,6 +26,8 @@ pub struct LinderaTokenStream<'a> {
} }
const DICTKINDKEY: &str = "dict_kind"; const DICTKINDKEY: &str = "dict_kind";
const DICTBUILDDIRKEY: &str = "dict_build_dir";
const DICTDOWNLOADURLKEY: &str = "download_urls";
const FILTERKEY: &str = "filter"; const FILTERKEY: &str = "filter";
impl<'a> TokenStream for LinderaTokenStream<'a> { impl<'a> TokenStream for LinderaTokenStream<'a> {
@ -62,8 +65,12 @@ impl LinderaTokenizer {
/// This function will create a new `LinderaTokenizer` with json parameters. /// This function will create a new `LinderaTokenizer` with json parameters.
pub fn from_json(params: &json::Map<String, json::Value>) -> Result<LinderaTokenizer> { pub fn from_json(params: &json::Map<String, json::Value>) -> Result<LinderaTokenizer> {
let kind: DictionaryKind = fetch_lindera_kind(params)?; let kind: DictionaryKind = fetch_lindera_kind(params)?;
let dictionary =
load_dictionary_from_kind(&kind, "/var/lib/milvus/dict/lindera".to_string(), vec![])?; // for download dict online
let build_dir = fetch_dict_build_dir(params)?;
let download_urls = fetch_dict_download_urls(params)?;
let dictionary = load_dictionary_from_kind(&kind, build_dir, download_urls)?;
let segmenter = Segmenter::new(Mode::Normal, dictionary, None); let segmenter = Segmenter::new(Mode::Normal, dictionary, None);
let mut tokenizer = LinderaTokenizer::from_segmenter(segmenter); let mut tokenizer = LinderaTokenizer::from_segmenter(segmenter);
@ -126,18 +133,34 @@ impl DictionaryKindParser for &str {
fn fetch_lindera_kind(params: &json::Map<String, json::Value>) -> Result<DictionaryKind> { fn fetch_lindera_kind(params: &json::Map<String, json::Value>) -> Result<DictionaryKind> {
params params
.get(DICTKINDKEY) .get(DICTKINDKEY)
.ok_or_else(|| { .ok_or(TantivyBindingError::InvalidArgument(format!(
TantivyBindingError::InvalidArgument(format!("lindera tokenizer dict_kind must be set")) "lindera tokenizer dict_kind must be set"
})? )))?
.as_str() .as_str()
.ok_or_else(|| { .ok_or(TantivyBindingError::InvalidArgument(format!(
TantivyBindingError::InvalidArgument(format!(
"lindera tokenizer dict kind should be string" "lindera tokenizer dict kind should be string"
)) )))?
})?
.into_dict_kind() .into_dict_kind()
} }
fn fetch_dict_build_dir(params: &json::Map<String, json::Value>) -> Result<String> {
params
.get(DICTBUILDDIRKEY)
.map_or(Ok("/var/lib/milvus/dict/lindera".to_string()), |v| {
v.as_str()
.ok_or(TantivyBindingError::InvalidArgument(format!(
"dict build dir must be string"
)))
.map(|s| s.to_string())
})
}
fn fetch_dict_download_urls(params: &json::Map<String, json::Value>) -> Result<Vec<String>> {
params.get(DICTDOWNLOADURLKEY).map_or(Ok(vec![]), |v| {
get_string_list(v, "lindera dict download urls")
})
}
fn fetch_lindera_tags_from_params( fn fetch_lindera_tags_from_params(
params: &json::Map<String, json::Value>, params: &json::Map<String, json::Value>,
) -> Result<HashSet<String>> { ) -> Result<HashSet<String>> {
@ -330,7 +353,6 @@ mod tests {
use tantivy::tokenizer::Tokenizer; use tantivy::tokenizer::Tokenizer;
#[test] #[test]
#[cfg(feature = "lindera-ipadic")]
fn test_lindera_tokenizer() { fn test_lindera_tokenizer() {
let params = r#"{ let params = r#"{
"type": "lindera", "type": "lindera",
@ -358,7 +380,6 @@ mod tests {
} }
#[test] #[test]
#[cfg(feature = "lindera-cc-cedict")]
fn test_lindera_tokenizer_cc() { fn test_lindera_tokenizer_cc() {
let params = r#"{ let params = r#"{
"type": "lindera", "type": "lindera",

View File

@ -89,7 +89,7 @@ pub fn get_builder_with_tokenizer(
} }
_ => { _ => {
return Err(TantivyBindingError::InvalidArgument(format!( return Err(TantivyBindingError::InvalidArgument(format!(
"costum tokenizer must set type" "customized tokenizer must set type"
))) )))
} }
} }

View File

@ -82,9 +82,9 @@ impl AnalyzerBuilder<'_> {
for filter in filters { for filter in filters {
if filter.is_string() { if filter.is_string() {
let filter_name = filter.as_str().unwrap(); let filter_name = filter.as_str().unwrap();
let costum = self.filters.remove(filter_name); let customize = self.filters.remove(filter_name);
if !costum.is_none() { if !customize.is_none() {
builder = costum.unwrap().transform(builder); builder = customize.unwrap().transform(builder);
continue; continue;
} }

View File

@ -51,7 +51,7 @@ pub fn get_builder_with_tokenizer(params: &json::Value) -> Result<TextAnalyzerBu
} }
_ => { _ => {
return Err(TantivyBindingError::InvalidArgument( return Err(TantivyBindingError::InvalidArgument(
"costum tokenizer must set type".to_string(), "customized tokenizer must set type".to_string(),
)) ))
} }
} }

View File

@ -9,12 +9,22 @@ package ctokenizer
import "C" import "C"
import ( import (
"encoding/json"
"fmt"
"path"
"unsafe" "unsafe"
"github.com/milvus-io/milvus/internal/util/tokenizerapi" "github.com/milvus-io/milvus/internal/util/tokenizerapi"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
func NewTokenizer(param string) (tokenizerapi.Tokenizer, error) { func NewTokenizer(param string) (tokenizerapi.Tokenizer, error) {
param, err := CheckAndFillParams(param)
if err != nil {
return nil, err
}
paramPtr := C.CString(param) paramPtr := C.CString(param)
defer C.free(unsafe.Pointer(paramPtr)) defer C.free(unsafe.Pointer(paramPtr))
@ -27,7 +37,100 @@ func NewTokenizer(param string) (tokenizerapi.Tokenizer, error) {
return NewCTokenizer(ptr), nil return NewCTokenizer(ptr), nil
} }
func CheckAndFillParams(params string) (string, error) {
if len(params) == 0 {
return "", nil
}
var paramMaps map[string]any
flag := false
err := json.Unmarshal([]byte(params), &paramMaps)
if err != nil {
return "", merr.WrapErrAsInputError(fmt.Errorf("unmarshal analyzer params failed with json error: %s", err.Error()))
}
tokenizer, ok := paramMaps["tokenizer"]
if !ok {
// skip check if no tokenizer params
return params, nil
}
switch value := tokenizer.(type) {
case string:
// return if use build-in tokenizer
return params, nil
case map[string]any:
flag, err = CheckAndFillTokenizerParams(value)
if err != nil {
return "", err
}
default:
return "", merr.WrapErrAsInputError(fmt.Errorf("analyzer params set tokenizer with unknown type"))
}
// remarshal json params if params map was changed.
if flag {
bytes, err := json.Marshal(paramMaps)
if err != nil {
return "", merr.WrapErrAsInputError(fmt.Errorf("marshal analyzer params failed with json error: %s", err.Error()))
}
return string(bytes), nil
}
return params, nil
}
// fill some milvus params to tokenizer params
func CheckAndFillTokenizerParams(params map[string]any) (bool, error) {
v, ok := params["type"]
if !ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("costom tokenizer must set type"))
}
tokenizerType, ok := v.(string)
if !ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("costom tokenizer type must be string"))
}
switch tokenizerType {
case "lindera":
cfg := paramtable.Get()
if _, ok := params["dict_build_dir"]; ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("costom tokenizer dict_build_dir was system params, should not be set"))
}
// build lindera to LocalResourcePath/lindera/dict_kind
params["dict_build_dir"] = path.Join(cfg.FunctionCfg.LocalResourcePath.GetValue(), "lindera")
v, ok := params["dict_kind"]
if !ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("lindera tokenizer must set dict_kind"))
}
dictKind, ok := v.(string)
if !ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("lindera tokenizer dict kind must be string"))
}
dictUrlsMap := cfg.FunctionCfg.LinderaDownloadUrls.GetValue()
if _, ok := params["download_urls"]; ok {
return false, merr.WrapErrAsInputError(fmt.Errorf("costom tokenizer download_urls was system params, should not be set"))
}
if value, ok := dictUrlsMap["."+dictKind]; ok {
// use download urls set in milvus yaml
params["download_urls"] = paramtable.ParseAsStings(value)
}
return true, nil
default:
return false, nil
}
}
func ValidateTokenizer(param string) error { func ValidateTokenizer(param string) error {
param, err := CheckAndFillParams(param)
if err != nil {
return err
}
paramPtr := C.CString(param) paramPtr := C.CString(param)
defer C.free(unsafe.Pointer(paramPtr)) defer C.free(unsafe.Pointer(paramPtr))

View File

@ -5,14 +5,19 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
func TestTokenizer(t *testing.T) { func TestTokenizer(t *testing.T) {
paramtable.Init()
// default tokenizer. // default tokenizer.
{ {
m := "{\"tokenizer\": \"standard\"}" m := "{\"tokenizer\": \"standard\"}"
tokenizer, err := NewTokenizer(m) tokenizer, err := NewTokenizer(m)
assert.NoError(t, err) require.NoError(t, err)
defer tokenizer.Destroy() defer tokenizer.Destroy()
tokenStream := tokenizer.NewTokenStream("football, basketball, pingpang") tokenStream := tokenizer.NewTokenStream("football, basketball, pingpang")
@ -26,7 +31,7 @@ func TestTokenizer(t *testing.T) {
{ {
m := "{\"tokenizer\": \"jieba\"}" m := "{\"tokenizer\": \"jieba\"}"
tokenizer, err := NewTokenizer(m) tokenizer, err := NewTokenizer(m)
assert.NoError(t, err) require.NoError(t, err)
defer tokenizer.Destroy() defer tokenizer.Destroy()
tokenStream := tokenizer.NewTokenStream("张华考上了北京大学;李萍进了中等技术学校;我在百货公司当售货员:我们都有光明的前途") tokenStream := tokenizer.NewTokenStream("张华考上了北京大学;李萍进了中等技术学校;我在百货公司当售货员:我们都有光明的前途")
@ -35,6 +40,20 @@ func TestTokenizer(t *testing.T) {
fmt.Println(tokenStream.Token()) fmt.Println(tokenStream.Token())
} }
} }
// lindera tokenizer.
{
m := "{\"tokenizer\": {\"type\":\"lindera\", \"dict_kind\": \"ipadic\"}}"
tokenizer, err := NewTokenizer(m)
require.NoError(t, err)
defer tokenizer.Destroy()
tokenStream := tokenizer.NewTokenStream("東京スカイツリーの最寄り駅はとうきょうスカイツリー駅です")
defer tokenStream.Destroy()
for tokenStream.Advance() {
fmt.Println(tokenStream.Token())
}
}
} }
func TestValidateTokenizer(t *testing.T) { func TestValidateTokenizer(t *testing.T) {
@ -58,3 +77,71 @@ func TestValidateTokenizer(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
} }
} }
func TestCheckAndFillParams(t *testing.T) {
paramtable.Init()
paramtable.Get().SaveGroup(map[string]string{"function.analyzer.lindera.download_urls.ipadic": "/test/url"})
// normal case
{
m := "{\"tokenizer\": {\"type\":\"jieba\"}}"
_, err := CheckAndFillParams(m)
assert.NoError(t, err)
}
// fill lindera tokenizer download urls and dict local path
{
m := "{\"tokenizer\": {\"type\":\"lindera\", \"dict_kind\": \"ipadic\"}}"
_, err := CheckAndFillParams(m)
assert.NoError(t, err)
}
// error with wrong json
{
m := "{invalid json"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
// skip if use default analyzer
{
m := "{}"
_, err := CheckAndFillParams(m)
assert.NoError(t, err)
}
// error tokenizer without type
{
m := "{\"tokenizer\": {\"dict_kind\": \"ipadic\"}}"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
// error tokenizer type not string
{
m := "{\"tokenizer\": {\"type\": 1, \"dict_kind\": \"ipadic\"}}"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
// error tokenizer params type
{
m := "{\"tokenizer\": 1}"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
// error set dict_build_dir by user
{
m := "{\"tokenizer\": {\"type\": \"lindera\", \"dict_kind\": \"ipadic\", \"dict_build_dir\": \"/tmp/milvus\"}}"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
// error lindera kind not set
{
m := "{\"tokenizer\": {\"type\": \"lindera\"}}"
_, err := CheckAndFillParams(m)
assert.Error(t, err)
}
}

View File

@ -23,6 +23,8 @@ import (
type functionConfig struct { type functionConfig struct {
TextEmbeddingProviders ParamGroup `refreshable:"true"` TextEmbeddingProviders ParamGroup `refreshable:"true"`
RerankModelProviders ParamGroup `refreshable:"true"` RerankModelProviders ParamGroup `refreshable:"true"`
LocalResourcePath ParamItem `refreshable:"true"`
LinderaDownloadUrls ParamGroup `refreshable:"true"`
} }
func (p *functionConfig) init(base *BaseTable) { func (p *functionConfig) init(base *BaseTable) {
@ -91,6 +93,20 @@ func (p *functionConfig) init(base *BaseTable) {
}, },
} }
p.RerankModelProviders.Init(base.mgr) p.RerankModelProviders.Init(base.mgr)
p.LocalResourcePath = ParamItem{
Key: "function.analyzer.local_resource_path",
Version: "2.5.16",
Export: true,
DefaultValue: "/var/lib/milvus/analyzer",
}
p.LocalResourcePath.Init(base.mgr)
p.LinderaDownloadUrls = ParamGroup{
KeyPrefix: "function.analyzer.lindera.download_urls",
Version: "2.5.16",
}
p.LinderaDownloadUrls.Init(base.mgr)
} }
const ( const (

View File

@ -1218,7 +1218,7 @@ func TestRunAnalyzer(t *testing.T) {
// run analyzer with invalid params // run analyzer with invalid params
_, err = mc.RunAnalyzer(ctx, client.NewRunAnalyzerOption("text doc").WithAnalyzerParamsStr("invalid params}")) _, err = mc.RunAnalyzer(ctx, client.NewRunAnalyzerOption("text doc").WithAnalyzerParamsStr("invalid params}"))
common.CheckErr(t, err, false, "JsonError") common.CheckErr(t, err, false, "json error")
// run analyzer with custom analyzer // run analyzer with custom analyzer
tokens, err = mc.RunAnalyzer(ctx, client.NewRunAnalyzerOption("test doc"). tokens, err = mc.RunAnalyzer(ctx, client.NewRunAnalyzerOption("test doc").