enhance: update tantivy for removing "doc_id" fast field (#41198)

Issue: #41210

After https://github.com/zilliztech/tantivy/pull/5, we can provide
milvus row id directly to tantivy rather than record it in the fast
field "doc_id".
So rather than search tantivy doc id and then get milvus row id from
"doc_id", now, the searched tantivy doc id is the milvus row id,
eliminating the expensive acquiring row id phase.

The following shows a simple benchmark where insert **1M** docs where
all rows are "hello", the latency is **segcore** level, CPU is 9900K:

![image](https://github.com/user-attachments/assets/d8e72134-56b5-430b-8628-36c3bed8eaad)
**The latency is 2.02 and 2.1 times respectively.**

bench mark code:
```
TEST(TextMatch, TestPerf) {
    auto schema = GenTestSchema({}, true);
    auto seg = CreateSealedSegment(schema, empty_index_meta);
    int64_t N = 1000000;
    uint64_t seed = 19190504;
    auto raw_data = DataGen(schema, N, seed);
    auto str_col = raw_data.raw_->mutable_fields_data()
                       ->at(1)
                       .mutable_scalars()
                       ->mutable_string_data()
                       ->mutable_data();
    for (int64_t i = 0; i < N - 1; i++) {
        str_col->at(i) = "hello";
    }
    SealedLoadFieldData(raw_data, *seg);
    seg->CreateTextIndex(FieldId(101));

    auto now = std::chrono::high_resolution_clock::now();
    auto expr = GetMatchExpr(schema, "hello", OpType::TextMatch);
    auto final = ExecuteQueryExpr(expr, seg.get(), N, MAX_TIMESTAMP);
    auto end = std::chrono::high_resolution_clock::now();
    auto duration =
        std::chrono::duration_cast<std::chrono::microseconds>(end - now);
    std::cout << "TextMatch query time: " << duration.count() << "ms"
              << std::endl;
}
```

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
Spade A 2025-04-15 20:20:32 +08:00 committed by GitHub
parent a953eaeaf0
commit 70d13dcf61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 500 additions and 150 deletions

View File

@ -274,14 +274,11 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
boost::filesystem::create_directories(path_);
std::string field_name =
std::to_string(disk_file_manager_->GetFieldDataMeta().field_id);
d_type_ = TantivyDataType::Keyword;
wrapper_ = std::make_shared<TantivyIndexWrapper>(
field_name.c_str(),
d_type_,
path_.c_str(),
tantivy_index_version,
false,
false,
false, /* in_ram */
1,
json_stats_tantivy_memory_budget);
}
@ -291,9 +288,8 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
int64_t commit_interval_in_ms, const char* unique_id)
: commit_interval_in_ms_(commit_interval_in_ms),
last_commit_time_(stdclock::now()) {
d_type_ = TantivyDataType::Keyword;
wrapper_ = std::make_shared<TantivyIndexWrapper>(
unique_id, d_type_, "", TANTIVY_INDEX_LATEST_VERSION, false, true);
unique_id, "", TANTIVY_INDEX_LATEST_VERSION, true /* in_ram */);
}
JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
@ -306,9 +302,8 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
boost::filesystem::path sub_path = unique_id;
path_ = (prefix / sub_path).string();
boost::filesystem::create_directories(path_);
d_type_ = TantivyDataType::Keyword;
wrapper_ = std::make_shared<TantivyIndexWrapper>(
unique_id, d_type_, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION);
unique_id, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION);
}
IndexStatsPtr

View File

@ -1782,7 +1782,7 @@ dependencies = [
[[package]]
name = "ownedbytes"
version = "0.7.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"stable_deref_trait",
]
@ -2523,7 +2523,7 @@ dependencies = [
[[package]]
name = "tantivy"
version = "0.23.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"aho-corasick",
"arc-swap",
@ -2609,7 +2609,7 @@ dependencies = [
[[package]]
name = "tantivy-bitpacker"
version = "0.6.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"bitpacking 0.9.2",
]
@ -2632,7 +2632,7 @@ dependencies = [
[[package]]
name = "tantivy-columnar"
version = "0.3.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"downcast-rs",
"fastdivide",
@ -2659,7 +2659,7 @@ dependencies = [
[[package]]
name = "tantivy-common"
version = "0.7.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"async-trait",
"byteorder",
@ -2702,7 +2702,7 @@ dependencies = [
[[package]]
name = "tantivy-query-grammar"
version = "0.22.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"nom",
]
@ -2720,7 +2720,7 @@ dependencies = [
[[package]]
name = "tantivy-sstable"
version = "0.3.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"futures-util",
"itertools 0.14.0",
@ -2742,7 +2742,7 @@ dependencies = [
[[package]]
name = "tantivy-stacker"
version = "0.3.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"murmurhash32",
"rand_distr",
@ -2760,7 +2760,7 @@ dependencies = [
[[package]]
name = "tantivy-tokenizer-api"
version = "0.3.0"
source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88"
source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402"
dependencies = [
"serde",
]
@ -3245,7 +3245,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@ -103,6 +103,13 @@ void hashmap_set_value(void *map, const char *key, const char *value);
void free_hashmap(void *map);
RustResult tantivy_create_json_key_stats_writer(const char *field_name,
const char *path,
uint32_t tantivy_index_version,
uintptr_t num_threads,
uintptr_t overall_memory_budget_in_bytes,
bool in_ram);
RustResult tantivy_load_index(const char *path);
void tantivy_free_index_reader(void *ptr);
@ -182,8 +189,7 @@ RustResult tantivy_create_index(const char *field_name,
const char *path,
uint32_t tantivy_index_version,
uintptr_t num_threads,
uintptr_t overall_memory_budget_in_bytes,
bool in_ram);
uintptr_t overall_memory_budget_in_bytes);
RustResult tantivy_create_index_with_single_segment(const char *field_name,
TantivyDataType data_type,

View File

@ -0,0 +1,35 @@
use crate::error::Result;
use crate::index_writer::IndexWriterWrapper;
use crate::{index_writer_v5, index_writer_v7, TantivyIndexVersion};
impl IndexWriterWrapper {
pub fn create_json_key_stats_writer(
field_name: &str,
path: &str,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
tanviy_index_version: TantivyIndexVersion,
in_ram: bool,
) -> Result<IndexWriterWrapper> {
match tanviy_index_version {
TantivyIndexVersion::V5 => Ok(IndexWriterWrapper::V5(
index_writer_v5::IndexWriterWrapperImpl::create_json_key_stats_writer(
field_name,
path,
num_threads,
overall_memory_budget_in_bytes,
in_ram,
)?,
)),
TantivyIndexVersion::V7 => Ok(IndexWriterWrapper::V7(
index_writer_v7::IndexWriterWrapperImpl::create_json_key_stats_writer(
field_name,
path,
num_threads,
overall_memory_budget_in_bytes,
in_ram,
)?,
)),
}
}
}

View File

@ -0,0 +1,43 @@
use std::ffi::c_char;
use std::ffi::CStr;
use crate::array::RustResult;
use crate::cstr_to_str;
use crate::index_writer::IndexWriterWrapper;
use crate::log::init_log;
use crate::util::create_binding;
use crate::TantivyIndexVersion;
#[no_mangle]
pub extern "C" fn tantivy_create_json_key_stats_writer(
field_name: *const c_char,
path: *const c_char,
tantivy_index_version: u32,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> RustResult {
init_log();
let field_name_str = cstr_to_str!(field_name);
let path_str = cstr_to_str!(path);
let tantivy_index_version = match TantivyIndexVersion::from_u32(tantivy_index_version) {
Ok(v) => v,
Err(e) => return RustResult::from_error(e.to_string()),
};
match IndexWriterWrapper::create_json_key_stats_writer(
field_name_str,
path_str,
num_threads,
overall_memory_budget_in_bytes,
tantivy_index_version,
in_ram,
) {
Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)),
Err(err) => RustResult::from_error(format!(
"create json key stats writer failed with error: {}",
err.to_string(),
)),
}
}

View File

@ -7,6 +7,7 @@ use tantivy::{Index, IndexReader, ReloadPolicy, Term};
use crate::docid_collector::DocIdCollector;
use crate::log::init_log;
use crate::milvus_id_collector::MilvusIdCollector;
use crate::util::make_bounds;
use crate::vec_collector::VecCollector;
@ -19,6 +20,7 @@ pub(crate) struct IndexReaderWrapper {
pub(crate) reader: IndexReader,
pub(crate) index: Arc<Index>,
pub(crate) id_field: Option<Field>,
pub(crate) user_specified_doc_id: bool,
}
impl IndexReaderWrapper {
@ -39,6 +41,8 @@ impl IndexReaderWrapper {
Err(_) => None,
};
assert!(!schema.user_specified_doc_id() || id_field.is_none());
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay) // OnCommitWithDelay serve for growing segment.
@ -51,6 +55,7 @@ impl IndexReaderWrapper {
reader,
index,
id_field,
user_specified_doc_id: schema.user_specified_doc_id(),
})
}
@ -63,7 +68,11 @@ impl IndexReaderWrapper {
let metas = self.index.searchable_segment_metas()?;
let mut sum: u32 = 0;
for meta in metas {
sum += meta.max_doc();
if self.user_specified_doc_id {
sum = std::cmp::max(sum, meta.max_doc());
} else {
sum += meta.max_doc();
}
}
Ok(sum)
}
@ -78,16 +87,23 @@ impl IndexReaderWrapper {
.map_err(TantivyBindingError::TantivyError)
}
None => {
// older version without doc_id, only one segment.
searcher
.search(q, &VecCollector {})
.map_err(TantivyBindingError::TantivyError)
if self.user_specified_doc_id {
// newer version with user specified doc id.
searcher
.search(q, &MilvusIdCollector::default())
.map_err(TantivyBindingError::TantivyError)
} else {
// older version without doc_id, only one segment.
searcher
.search(q, &VecCollector {})
.map_err(TantivyBindingError::TantivyError)
}
}
}
}
// Generally, we should use [`crate::search`], except for some special senarios where the doc_id could beyound
// the score of u32.
// the scope of u32 such as json key stats offset.
#[allow(dead_code)]
pub(crate) fn search_i64(&self, q: &dyn Query) -> Result<Vec<i64>> {
assert!(self.id_field.is_some());
@ -298,7 +314,7 @@ mod test {
use tantivy::{
doc,
schema::{Schema, STORED, STRING},
schema::{Schema, STORED, STRING, TEXT_WITH_DOC_ID},
Index,
};
@ -326,4 +342,40 @@ mod test {
res = index_reader_wrapper.prefix_query_keyword("$").unwrap();
assert_eq!(res.len(), 1);
}
#[test]
fn test_count() {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("title", TEXT_WITH_DOC_ID);
schema_builder.enable_user_specified_doc_id();
let schema = schema_builder.build();
let title = schema.get_field("title").unwrap();
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer(50000000).unwrap();
for i in 0..10_000 {
index_writer
.add_document_with_doc_id(i, doc!(title => format!("abc{}", i)))
.unwrap();
}
index_writer.commit().unwrap();
let index_shared = Arc::new(index);
let index_reader_wrapper = IndexReaderWrapper::from_index(index_shared).unwrap();
let count = index_reader_wrapper.count().unwrap();
assert_eq!(count, 10000);
let batch: Vec<_> = (0..10_000)
.into_iter()
.map(|i| doc!(title => format!("hello{}", i)))
.collect();
index_writer
.add_documents_with_doc_id(10_000, batch)
.unwrap();
index_writer.commit().unwrap();
index_reader_wrapper.reload().unwrap();
let count = index_reader_wrapper.count().unwrap();
assert_eq!(count, 20000);
}
}

View File

@ -85,7 +85,9 @@ mod tests {
.unwrap();
writer.add_data_by_batch(&["网球和滑雪"], Some(0)).unwrap();
writer.add_data_by_batch(&["网球以及滑雪"], Some(1)).unwrap();
writer
.add_data_by_batch(&["网球以及滑雪"], Some(1))
.unwrap();
writer.commit().unwrap();
@ -129,7 +131,5 @@ mod tests {
let res = reader.search(&query).unwrap();
assert_eq!(res, (0..10000).collect::<Vec<u32>>());
let res = reader.search_i64(&query).unwrap();
assert_eq!(res, (0..10000).collect::<Vec<i64>>());
}
}

View File

@ -29,7 +29,6 @@ impl IndexWriterWrapper {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
tanviy_index_version: TantivyIndexVersion,
in_ram: bool,
) -> Result<IndexWriterWrapper> {
init_log();
match tanviy_index_version {
@ -40,7 +39,6 @@ impl IndexWriterWrapper {
path,
num_threads,
overall_memory_budget_in_bytes,
in_ram,
)?;
Ok(IndexWriterWrapper::V5(writer))
}
@ -51,7 +49,6 @@ impl IndexWriterWrapper {
path,
num_threads,
overall_memory_budget_in_bytes,
in_ram,
)?;
Ok(IndexWriterWrapper::V7(writer))
}
@ -88,7 +85,9 @@ impl IndexWriterWrapper {
{
match self {
IndexWriterWrapper::V5(writer) => writer.add_data_by_batch(data, offset),
IndexWriterWrapper::V7(writer) => writer.add_data_by_batch(data, offset.unwrap()),
IndexWriterWrapper::V7(writer) => {
writer.add_data_by_batch(data, offset.unwrap() as u32)
}
}
}
@ -99,7 +98,7 @@ impl IndexWriterWrapper {
{
match self {
IndexWriterWrapper::V5(writer) => writer.add_array(data, offset),
IndexWriterWrapper::V7(writer) => writer.add_array(data, offset.unwrap()),
IndexWriterWrapper::V7(writer) => writer.add_array(data, offset.unwrap() as u32),
}
}
@ -110,7 +109,9 @@ impl IndexWriterWrapper {
) -> Result<()> {
match self {
IndexWriterWrapper::V5(writer) => writer.add_string_by_batch(data, offset),
IndexWriterWrapper::V7(writer) => writer.add_string_by_batch(data, offset.unwrap()),
IndexWriterWrapper::V7(writer) => {
writer.add_string_by_batch(data, offset.unwrap() as u32)
}
}
}
@ -121,7 +122,9 @@ impl IndexWriterWrapper {
) -> Result<()> {
match self {
IndexWriterWrapper::V5(writer) => writer.add_array_keywords(datas, offset),
IndexWriterWrapper::V7(writer) => writer.add_array_keywords(datas, offset.unwrap()),
IndexWriterWrapper::V7(writer) => {
writer.add_array_keywords(datas, offset.unwrap() as u32)
}
}
}
@ -193,7 +196,6 @@ mod tests {
1,
50_000_000,
TantivyIndexVersion::V5,
false,
)
.unwrap();
@ -309,14 +311,12 @@ mod tests {
#[test]
pub fn test_add_json_key_stats() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
let mut index_writer = IndexWriterWrapper::create_json_key_stats_writer(
"test",
TantivyDataType::Keyword,
temp_dir.path().to_str().unwrap().to_string(),
temp_dir.path().to_str().unwrap(),
1,
15 * 1024 * 1024,
TantivyIndexVersion::V7,
@ -352,4 +352,73 @@ mod tests {
let count = index_writer.create_reader().unwrap().count().unwrap();
assert_eq!(count, total_count);
}
#[test]
pub fn test_add_strings_by_batch() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
"test",
TantivyDataType::Keyword,
temp_dir.path().to_str().unwrap().to_string(),
1,
15 * 1024 * 1024,
TantivyIndexVersion::V7,
)
.unwrap();
let keys = (0..10000)
.map(|i| format!("key{:05}", i))
.collect::<Vec<_>>();
let c_keys: Vec<CString> = keys.into_iter().map(|k| CString::new(k).unwrap()).collect();
let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect();
index_writer
.add_string_by_batch(&key_ptrs, Some(0))
.unwrap();
index_writer.commit().unwrap();
let reader = index_writer.create_reader().unwrap();
let count: u32 = reader.count().unwrap();
assert_eq!(count, 10000);
}
#[test]
pub fn test_add_data_by_batch() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
"test",
TantivyDataType::I64,
temp_dir.path().to_str().unwrap().to_string(),
1,
15 * 1024 * 1024,
TantivyIndexVersion::V7,
)
.unwrap();
let keys = (0..10000).collect::<Vec<_>>();
let mut count = 0;
for i in keys {
index_writer
.add_data_by_batch::<i64>(&[i], Some(i as i64))
.unwrap();
count += 1;
if count % 1000 == 0 {
index_writer.commit().unwrap();
}
}
index_writer.commit().unwrap();
let reader = index_writer.create_reader().unwrap();
let count: u32 = reader.count().unwrap();
assert_eq!(count, 10000);
}
}

View File

@ -28,7 +28,6 @@ pub extern "C" fn tantivy_create_index(
tantivy_index_version: u32,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram : bool,
) -> RustResult {
let field_name_str = cstr_to_str!(field_name);
let path_str = cstr_to_str!(path);
@ -45,7 +44,6 @@ pub extern "C" fn tantivy_create_index(
num_threads,
overall_memory_budget_in_bytes,
tantivy_index_version,
in_ram,
) {
Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)),
Err(e) => RustResult::from_error(e.to_string()),

View File

@ -27,7 +27,7 @@ pub(crate) struct IndexWriterWrapperImpl {
}
#[inline]
fn schema_builder_add_field(
pub(crate) fn schema_builder_add_field(
schema_builder: &mut SchemaBuilder,
field_name: &str,
data_type: TantivyDataType,
@ -50,48 +50,56 @@ fn schema_builder_add_field(
}
impl TantivyValue<TantivyDocument> for i8 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i16 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i32 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i64 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for f32 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_f64(Field::from_field_id(field), *self as f64);
}
}
impl TantivyValue<TantivyDocument> for f64 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_f64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for &str {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_text(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for bool {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_bool(Field::from_field_id(field), *self);
}
@ -104,7 +112,6 @@ impl IndexWriterWrapperImpl {
path: String,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
info!(
"create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 5",
@ -115,11 +122,7 @@ impl IndexWriterWrapperImpl {
// We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field.
let id_field = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build();
let index = if in_ram {
Index::create_in_ram(schema)
} else {
Index::create_in_dir(path.clone(), schema)?
};
let index = Index::create_in_dir(path.clone(), schema)?;
let index_writer =
index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?;
Ok(IndexWriterWrapperImpl {

View File

@ -0,0 +1,43 @@
use either::Either;
use log::info;
use std::sync::Arc;
use tantivy_5::{
schema::{Schema, FAST},
Index,
};
use crate::data_type::TantivyDataType;
use crate::error::Result;
use crate::index_writer_v5::index_writer::schema_builder_add_field;
use super::IndexWriterWrapperImpl;
impl IndexWriterWrapperImpl {
pub(crate) fn create_json_key_stats_writer(
field_name: &str,
path: &str,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
info!("create json key stats writer, field_name: {}", field_name);
let mut schema_builder = Schema::builder();
let field =
schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword);
let id_field = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build();
let index = if in_ram {
Index::create_in_ram(schema)
} else {
Index::create_in_dir(path, schema)?
};
let index_writer =
index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?;
Ok(IndexWriterWrapperImpl {
field,
index_writer: Either::Left(index_writer),
id_field: Some(id_field),
_index: Arc::new(index),
})
}
}

View File

@ -1,3 +1,4 @@
use log::info;
use std::sync::Arc;
use either::Either;
@ -5,7 +6,6 @@ use tantivy_5::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, Tex
use tantivy_5::Index;
use crate::error::Result;
use crate::log::init_log;
use super::analyzer::create_analyzer;
use super::IndexWriterWrapperImpl;
@ -32,7 +32,10 @@ impl IndexWriterWrapperImpl {
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
init_log();
info!(
"create text index writer, field_name: {}, tantivy_index_version 5",
field_name
);
let tokenizer = create_analyzer(tokenizer_params)?;

View File

@ -5,6 +5,7 @@
mod analyzer;
pub(crate) mod index_writer;
pub(crate) mod index_writer_json_key_stats;
pub(crate) mod index_writer_text;
pub(crate) use index_writer::IndexWriterWrapperImpl;

View File

@ -6,7 +6,7 @@ use libc::c_char;
use log::info;
use tantivy::indexer::UserOperation;
use tantivy::schema::{
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
Field, IndexRecordOption, NumericOptions, Schema, SchemaBuilder, TextFieldIndexing, TextOptions,
};
use tantivy::{doc, Index, IndexWriter, TantivyDocument};
@ -19,18 +19,25 @@ use crate::index_writer::TantivyValue;
const BATCH_SIZE: usize = 4096;
#[inline]
fn schema_builder_add_field(
pub(crate) fn schema_builder_add_field(
schema_builder: &mut SchemaBuilder,
field_name: &str,
data_type: TantivyDataType,
) -> Field {
match data_type {
TantivyDataType::I64 => schema_builder.add_i64_field(field_name, INDEXED),
TantivyDataType::F64 => schema_builder.add_f64_field(field_name, INDEXED),
TantivyDataType::Bool => schema_builder.add_bool_field(field_name, INDEXED),
TantivyDataType::I64 => {
schema_builder.add_i64_field(field_name, NumericOptions::default().set_indexed())
}
TantivyDataType::F64 => {
schema_builder.add_f64_field(field_name, NumericOptions::default().set_indexed())
}
TantivyDataType::Bool => {
schema_builder.add_bool_field(field_name, NumericOptions::default().set_indexed())
}
TantivyDataType::Keyword => {
let text_field_indexing = TextFieldIndexing::default()
.set_tokenizer("raw")
.set_fieldnorms(false)
.set_index_option(IndexRecordOption::Basic);
let text_options = TextOptions::default().set_indexing_options(text_field_indexing);
schema_builder.add_text_field(field_name, text_options)
@ -42,48 +49,56 @@ fn schema_builder_add_field(
}
impl TantivyValue<TantivyDocument> for i8 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i16 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i32 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i64 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for f32 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_f64(Field::from_field_id(field), *self as f64);
}
}
impl TantivyValue<TantivyDocument> for f64 {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_f64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for &str {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_text(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for bool {
#[inline]
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_bool(Field::from_field_id(field), *self);
}
@ -92,7 +107,6 @@ impl TantivyValue<TantivyDocument> for bool {
pub struct IndexWriterWrapperImpl {
pub(crate) field: Field,
pub(crate) index_writer: IndexWriter,
pub(crate) id_field: Field,
pub(crate) index: Arc<Index>,
}
@ -103,7 +117,6 @@ impl IndexWriterWrapperImpl {
path: String,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
info!(
"create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 7",
@ -111,20 +124,14 @@ impl IndexWriterWrapperImpl {
);
let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, field_name, data_type);
// We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field.
let id_field = schema_builder.add_i64_field("doc_id", FAST);
schema_builder.enable_user_specified_doc_id();
let schema = schema_builder.build();
let index = if in_ram {
Index::create_in_ram(schema)
} else {
Index::create_in_dir(path.clone(), schema)?
};
let index = Index::create_in_dir(path.clone(), schema)?;
let index_writer =
index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?;
Ok(IndexWriterWrapperImpl {
field,
index_writer,
id_field,
index: Arc::new(index),
})
}
@ -134,36 +141,34 @@ impl IndexWriterWrapperImpl {
}
#[inline]
fn add_document(&mut self, mut document: TantivyDocument, offset: i64) -> Result<()> {
document.add_i64(self.id_field, offset);
self.index_writer.add_document(document)?;
fn add_document(&mut self, document: TantivyDocument, offset: u32) -> Result<()> {
self.index_writer
.add_document_with_doc_id(offset, document)?;
Ok(())
}
pub fn add_data_by_batch<T: TantivyValue<TantivyDocument>>(
&mut self,
batch_data: &[T],
offset_begin: i64,
mut offset: u32,
) -> Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
for (idx, data) in batch_data.into_iter().enumerate() {
let offset = offset_begin + idx as i64;
for data in batch_data.into_iter() {
let mut doc = TantivyDocument::default();
data.add_to_document(self.field.field_id(), &mut doc);
doc.add_i64(self.id_field, offset);
batch.push(UserOperation::Add(doc));
batch.push(doc);
if batch.len() == BATCH_SIZE {
self.index_writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
self.index_writer.add_documents_with_doc_id(
offset,
std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE)),
)?;
offset += BATCH_SIZE as u32;
}
}
if !batch.is_empty() {
self.index_writer.run(batch)?;
self.index_writer.add_documents_with_doc_id(offset, batch)?;
}
Ok(())
@ -172,7 +177,7 @@ impl IndexWriterWrapperImpl {
pub fn add_array<T: TantivyValue<TantivyDocument>, I>(
&mut self,
data: I,
offset: i64,
offset: u32,
) -> Result<()>
where
I: IntoIterator<Item = T>,
@ -184,7 +189,7 @@ impl IndexWriterWrapperImpl {
self.add_document(document, offset)
}
pub fn add_array_keywords(&mut self, datas: &[*const c_char], offset: i64) -> Result<()> {
pub fn add_array_keywords(&mut self, datas: &[*const c_char], offset: u32) -> Result<()> {
let mut document = TantivyDocument::default();
for element in datas {
let data = unsafe { CStr::from_ptr(*element) };
@ -194,27 +199,26 @@ impl IndexWriterWrapperImpl {
self.add_document(document, offset)
}
pub fn add_string_by_batch(&mut self, data: &[*const c_char], offset: i64) -> Result<()> {
pub fn add_string_by_batch(&mut self, data: &[*const c_char], mut offset: u32) -> Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
for (idx, key) in data.into_iter().enumerate() {
for key in data.into_iter() {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let key_offset = offset + idx as i64;
batch.push(UserOperation::Add(doc!(
self.id_field => key_offset,
batch.push(doc!(
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
self.index_writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
));
if batch.len() == BATCH_SIZE {
self.index_writer.add_documents_with_doc_id(
offset,
std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE)),
)?;
offset += BATCH_SIZE as u32;
}
}
if !batch.is_empty() {
self.index_writer.run(batch)?;
self.index_writer.add_documents_with_doc_id(offset, batch)?;
}
Ok(())
@ -227,6 +231,12 @@ impl IndexWriterWrapperImpl {
json_offsets_len: &[usize],
) -> Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let id_field = self
.index_writer
.index()
.schema()
.get_field("doc_id")
.unwrap();
for i in 0..keys.len() {
let key = unsafe { CStr::from_ptr(keys[i]) }
.to_str()
@ -237,7 +247,7 @@ impl IndexWriterWrapperImpl {
for offset in offsets {
batch.push(UserOperation::Add(doc!(
self.id_field => *offset,
id_field => *offset,
self.field => key,
)));
@ -280,45 +290,3 @@ impl IndexWriterWrapperImpl {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::ffi::CString;
use tempfile::tempdir;
use crate::TantivyIndexVersion;
#[test]
pub fn test_add_json_key_stats() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
"test",
TantivyDataType::Keyword,
temp_dir.path().to_str().unwrap().to_string(),
1,
15 * 1024 * 1024,
TantivyIndexVersion::V7,
false,
)
.unwrap();
let keys = (0..10000)
.map(|i| format!("key{:05}", i))
.collect::<Vec<_>>();
let c_keys: Vec<CString> = keys.into_iter().map(|k| CString::new(k).unwrap()).collect();
let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect();
index_writer
.add_string_by_batch(&key_ptrs, Some(0))
.unwrap();
index_writer.commit().unwrap();
let reader = index_writer.create_reader().unwrap();
let count: u32 = reader.count().unwrap();
assert_eq!(count, 10000);
}
}

View File

@ -0,0 +1,42 @@
use log::info;
use std::sync::Arc;
use tantivy::{
schema::{Schema, FAST},
Index,
};
use crate::{
data_type::TantivyDataType, error::Result,
index_writer_v7::index_writer::schema_builder_add_field,
};
use super::IndexWriterWrapperImpl;
impl IndexWriterWrapperImpl {
pub(crate) fn create_json_key_stats_writer(
field_name: &str,
path: &str,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
info!("create json key stats writer, field_name: {}", field_name);
let mut schema_builder = Schema::builder();
let field =
schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword);
let _ = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build();
let index = if in_ram {
Index::create_in_ram(schema)
} else {
Index::create_in_dir(path, schema)?
};
let index_writer =
index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?;
Ok(IndexWriterWrapperImpl {
field,
index_writer,
index: Arc::new(index),
})
}
}

View File

@ -1,24 +1,25 @@
use log::info;
use std::sync::Arc;
use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, FAST};
use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions};
use tantivy::Index;
use crate::analyzer::create_analyzer;
use crate::error::Result;
use crate::log::init_log;
use super::IndexWriterWrapperImpl;
fn build_text_schema(field_name: &str, tokenizer_name: &str) -> (Schema, Field, Field) {
fn build_text_schema(field_name: &str, tokenizer_name: &str) -> (Schema, Field) {
let mut schema_builder = Schema::builder();
// positions is required for matching phase.
let indexing = TextFieldIndexing::default()
.set_tokenizer(tokenizer_name)
.set_fieldnorms(false)
.set_index_option(IndexRecordOption::WithFreqsAndPositions);
let option = TextOptions::default().set_indexing_options(indexing);
let field = schema_builder.add_text_field(field_name, option);
let id_field = schema_builder.add_i64_field("doc_id", FAST);
(schema_builder.build(), field, id_field)
schema_builder.enable_user_specified_doc_id();
(schema_builder.build(), field)
}
impl IndexWriterWrapperImpl {
@ -31,10 +32,14 @@ impl IndexWriterWrapperImpl {
overall_memory_budget_in_bytes: usize,
in_ram: bool,
) -> Result<IndexWriterWrapperImpl> {
init_log();
info!(
"create text index writer, field_name: {}, tantivy_index_version 7",
field_name
);
let tokenizer = create_analyzer(tokenizer_params)?;
let (schema, field, id_field) = build_text_schema(field_name, tokenizer_name);
let (schema, field) = build_text_schema(field_name, tokenizer_name);
let index = if in_ram {
Index::create_in_ram(schema)
} else {
@ -48,7 +53,6 @@ impl IndexWriterWrapperImpl {
Ok(IndexWriterWrapperImpl {
field,
index_writer,
id_field,
index: Arc::new(index),
})
}

View File

@ -3,6 +3,7 @@
//! in most cases.
pub(crate) mod index_writer;
pub(crate) mod index_writer_json_key_stats;
pub(crate) mod index_writer_text;
pub(crate) use index_writer::IndexWriterWrapperImpl;

View File

@ -7,6 +7,8 @@ mod demo_c;
mod docid_collector;
mod error;
mod hashmap_c;
mod index_json_key_stats_writer;
mod index_json_key_stats_writer_c;
mod index_reader;
mod index_reader_c;
mod index_reader_text;
@ -18,6 +20,7 @@ mod index_writer_text_c;
mod index_writer_v5;
mod index_writer_v7;
mod log;
mod milvus_id_collector;
mod string_c;
mod token_stream_c;
mod tokenizer_c;

View File

@ -0,0 +1,63 @@
use tantivy::{
collector::{Collector, SegmentCollector},
DocId, Score, SegmentOrdinal, SegmentReader,
};
#[derive(Default)]
pub(crate) struct MilvusIdCollector {}
pub(crate) struct MilvusIdChildCollector {
milvus_doc_ids: Vec<u32>,
}
impl Collector for MilvusIdCollector {
type Fruit = Vec<u32>;
type Child = MilvusIdChildCollector;
fn for_segment(
&self,
_segment_local_id: SegmentOrdinal,
_segment: &SegmentReader,
) -> tantivy::Result<Self::Child> {
Ok(MilvusIdChildCollector {
milvus_doc_ids: Vec::new(),
})
}
fn requires_scoring(&self) -> bool {
false
}
fn merge_fruits(
&self,
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> tantivy::Result<Self::Fruit> {
let len: usize = segment_fruits.iter().map(|docset| docset.len()).sum();
let mut result = Vec::with_capacity(len);
for docs in segment_fruits {
for doc in docs {
result.push(doc);
}
}
Ok(result)
}
}
impl SegmentCollector for MilvusIdChildCollector {
type Fruit = Vec<u32>;
#[inline]
fn collect_block(&mut self, docs: &[DocId]) {
self.milvus_doc_ids.extend(docs);
}
fn collect(&mut self, doc: DocId, _score: Score) {
// Unreachable code actually
self.collect_block(&[doc]);
}
#[inline]
fn harvest(self) -> Self::Fruit {
self.milvus_doc_ids
}
}

View File

@ -84,7 +84,6 @@ struct TantivyIndexWrapper {
const char* path,
uint32_t tantivy_index_version,
bool inverted_single_semgnent = false,
bool in_ram = false,
uintptr_t num_threads = DEFAULT_NUM_THREADS,
uintptr_t overall_memory_budget_in_bytes =
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) {
@ -102,8 +101,7 @@ struct TantivyIndexWrapper {
path,
tantivy_index_version,
num_threads,
overall_memory_budget_in_bytes,
in_ram));
overall_memory_budget_in_bytes));
}
AssertInfo(res.result_->success,
"failed to create index: {}",
@ -148,6 +146,29 @@ struct TantivyIndexWrapper {
writer_ = res.result_->value.ptr._0;
path_ = std::string(path);
}
// create index writer for json key stats
TantivyIndexWrapper(const char* field_name,
const char* path,
uint32_t tantivy_index_version,
bool in_ram = false,
uintptr_t num_threads = DEFAULT_NUM_THREADS,
uintptr_t overall_memory_budget_in_bytes =
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) {
auto res = RustResultWrapper(
tantivy_create_json_key_stats_writer(field_name,
path,
tantivy_index_version,
num_threads,
overall_memory_budget_in_bytes,
in_ram));
AssertInfo(res.result_->success,
"failed to create text writer: {}",
res.result_->error);
writer_ = res.result_->value.ptr._0;
path_ = std::string(path);
}
// create reader.
void
create_reader() {