enhance: process tantivy document add by batch (#40124)

issue: https://github.com/milvus-io/milvus/issues/40006

This PR make tantivy document add by batch. Add document by batch can
greately reduce the latency of scheduling the document add operation
(call tantivy `add_document` only schdules the add operation and it
returns immediately after scheduled) , because each call involes a tokio
block_on which is relatively heavy.

Reduce scheduling part not necessarily reduces the overall latency if
the index writer threads does not process indexing quickly enough.
But if scheduling itself is pretty slow, even the index writer threads
process indexing very fast (by increasing thread number), the overall
performance can still be limited.

The following codes bench the PR (Note, the duration only counts for
scheduling without commit)
```
fn test_performance() {
    let field_name = "text";
    let dir = TempDir::new().unwrap();
    let mut index_wrapper = IndexWriterWrapper::create_text_writer(
        field_name,
        dir.path().to_str().unwrap(),
        "default",
        "",
        1,
        50_000_000,
        false,
        TantivyIndexVersion::V7,
    )
    .unwrap();

    let mut batch = vec![];
    for i in 0..1_000_000 {
        batch.push(format!("hello{:04}", i));
    }
    let batch_ref = batch.iter().map(|s| s.as_str()).collect::<Vec<_>>();

    let now = std::time::Instant::now();
    index_wrapper
        .add_data_by_batch(&batch_ref, Some(0))
        .unwrap();
    let elapsed = now.elapsed();
    println!("add_data_by_batch elapsed: {:?}", elapsed);
}
```
Latency roughly reduces from 1.4s to 558ms.

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
Spade A 2025-04-08 19:50:24 +08:00 committed by GitHub
parent da21640ac3
commit c6a0c2ab64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 217 additions and 214 deletions

View File

@ -83,8 +83,8 @@ mod tests {
)
.unwrap();
writer.add("网球和滑雪", Some(0)).unwrap();
writer.add("网球以及滑雪", Some(1)).unwrap();
writer.add_data_by_batch(&["网球和滑雪"], Some(0)).unwrap();
writer.add_data_by_batch(&["网球以及滑雪"], Some(1)).unwrap();
writer.commit().unwrap();
@ -115,7 +115,7 @@ mod tests {
.unwrap();
for i in 0..10000 {
writer.add("hello world", Some(i)).unwrap();
writer.add_data_by_batch(&["hello world"], Some(i)).unwrap();
}
writer.commit().unwrap();

View File

@ -21,10 +21,8 @@ pub extern "C" fn tantivy_phrase_match_query(
slop: u32,
) -> RustResult {
let real = ptr as *mut IndexReaderWrapper;
unsafe {
let query = cstr_to_str!(query);
(*real).phrase_match_query(query, slop).into()
}
let query = cstr_to_str!(query);
unsafe { (*real).phrase_match_query(query, slop).into() }
}
#[no_mangle]

View File

@ -91,13 +91,13 @@ impl IndexWriterWrapper {
}
}
pub fn add<T>(&mut self, data: T, offset: Option<i64>) -> Result<()>
pub fn add_data_by_batch<T>(&mut self, data: &[T], offset: Option<i64>) -> Result<()>
where
T: TantivyValue<TantivyDocumentV5> + TantivyValue<TantivyDocumentV7>,
{
match self {
IndexWriterWrapper::V5(writer) => writer.add(data, offset),
IndexWriterWrapper::V7(writer) => writer.add(data, offset),
IndexWriterWrapper::V5(writer) => writer.add_data_by_batch(data, offset),
IndexWriterWrapper::V7(writer) => writer.add_data_by_batch(data, offset),
}
}
@ -187,7 +187,9 @@ mod tests {
.unwrap();
for i in 0..10 {
index_wrapper.add::<i64>(i, Some(i as i64)).unwrap();
index_wrapper
.add_data_by_batch::<i64>(&[i], Some(i as i64))
.unwrap();
}
index_wrapper.commit().unwrap();
}
@ -226,7 +228,7 @@ mod tests {
.unwrap();
for i in 0..10 {
index_wrapper.add::<i64>(i, None).unwrap();
index_wrapper.add_data_by_batch::<i64>(&[i], None).unwrap();
}
index_wrapper.finish().unwrap();
}
@ -269,7 +271,9 @@ mod tests {
.unwrap();
for i in 0..10 {
index_wrapper.add("hello", Some(i as i64)).unwrap();
index_wrapper
.add_data_by_batch(&["hello"], Some(i as i64))
.unwrap();
}
index_wrapper.commit().unwrap();
}

View File

@ -5,7 +5,6 @@ use crate::{
array::RustResult,
cstr_to_str,
data_type::TantivyDataType,
error::Result,
index_writer::IndexWriterWrapper,
util::{create_binding, free_binding},
TantivyIndexVersion,
@ -107,35 +106,6 @@ pub extern "C" fn tantivy_create_reader_from_writer(ptr: *mut c_void) -> RustRes
}
// -------------------------build--------------------
fn execute<T: Copy, I>(
arr: I,
offset: i64,
e: fn(&mut IndexWriterWrapper, T, Option<i64>) -> Result<()>,
w: &mut IndexWriterWrapper,
) -> Result<()>
where
I: IntoIterator<Item = T>,
{
for (index, data) in arr.into_iter().enumerate() {
e(w, data, Some(offset + (index as i64)))?;
}
Ok(())
}
fn execute_by_single_segment_writer<T: Copy, I>(
arr: I,
e: fn(&mut IndexWriterWrapper, T, Option<i64>) -> Result<()>,
w: &mut IndexWriterWrapper,
) -> Result<()>
where
I: IntoIterator<Item = T>,
{
for data in arr.into_iter() {
e(w, data, None)?;
}
Ok(())
}
#[no_mangle]
pub extern "C" fn tantivy_index_add_int8s(
ptr: *mut c_void,
@ -146,13 +116,9 @@ pub extern "C" fn tantivy_index_add_int8s(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.into_iter().map(|num| *num as i64),
offset_begin,
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<i8>(arr, Some(offset_begin))
.into()
}
}
@ -164,14 +130,7 @@ pub extern "C" fn tantivy_index_add_int8s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.into_iter().map(|num| *num as i64),
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<i8>(arr, None).into() }
}
#[no_mangle]
@ -184,13 +143,9 @@ pub extern "C" fn tantivy_index_add_int16s(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.into_iter().map(|num| *num as i64),
offset_begin,
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<i16>(arr, Some(offset_begin))
.into()
}
}
@ -202,14 +157,7 @@ pub extern "C" fn tantivy_index_add_int16s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.into_iter().map(|num| *num as i64),
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<i16>(arr, None).into() }
}
#[no_mangle]
@ -222,13 +170,9 @@ pub extern "C" fn tantivy_index_add_int32s(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.into_iter().map(|num| *num as i64),
offset_begin,
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<i32>(arr, Some(offset_begin))
.into()
}
}
@ -240,14 +184,7 @@ pub extern "C" fn tantivy_index_add_int32s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.into_iter().map(|num| *num as i64),
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<i32>(arr, None).into() }
}
#[no_mangle]
@ -259,15 +196,10 @@ pub extern "C" fn tantivy_index_add_int64s(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.iter().copied(),
offset_begin,
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<i64>(arr, Some(offset_begin))
.into()
}
}
@ -279,15 +211,7 @@ pub extern "C" fn tantivy_index_add_int64s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.iter().copied(),
IndexWriterWrapper::add::<i64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<i64>(arr, None).into() }
}
#[no_mangle]
@ -300,13 +224,9 @@ pub extern "C" fn tantivy_index_add_f32s(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.into_iter().map(|num| *num as f64),
offset_begin,
IndexWriterWrapper::add::<f64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<f32>(arr, Some(offset_begin))
.into()
}
}
@ -318,14 +238,7 @@ pub extern "C" fn tantivy_index_add_f32s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.into_iter().map(|num| *num as f64),
IndexWriterWrapper::add::<f64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<f32>(arr, None).into() }
}
#[no_mangle]
@ -338,13 +251,9 @@ pub extern "C" fn tantivy_index_add_f64s(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.iter().copied(),
offset_begin,
IndexWriterWrapper::add::<f64>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<f64>(arr, Some(offset_begin))
.into()
}
}
@ -356,14 +265,7 @@ pub extern "C" fn tantivy_index_add_f64s_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.into_iter().map(|num| *num as f64),
IndexWriterWrapper::add::<f64>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<f64>(arr, None).into() }
}
#[no_mangle]
@ -376,13 +278,9 @@ pub extern "C" fn tantivy_index_add_bools(
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute(
arr.iter().copied(),
offset_begin,
IndexWriterWrapper::add::<bool>,
&mut (*real),
)
.into()
(*real)
.add_data_by_batch::<bool>(arr, Some(offset_begin))
.into()
}
}
@ -394,14 +292,7 @@ pub extern "C" fn tantivy_index_add_bools_by_single_segment_writer(
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let arr = unsafe { slice::from_raw_parts(array, len) };
unsafe {
execute_by_single_segment_writer(
arr.iter().copied(),
IndexWriterWrapper::add::<bool>,
&mut (*real),
)
.into()
}
unsafe { (*real).add_data_by_batch::<bool>(arr, None).into() }
}
#[no_mangle]

View File

@ -8,7 +8,9 @@ use log::info;
use tantivy_5::schema::{
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
};
use tantivy_5::{doc, Document as TantivyDocument, Index, IndexWriter, SingleSegmentIndexWriter, UserOperation};
use tantivy_5::{
doc, Document as TantivyDocument, Index, IndexWriter, SingleSegmentIndexWriter, UserOperation,
};
use crate::data_type::TantivyDataType;
@ -48,15 +50,33 @@ fn schema_builder_add_field(
}
}
impl TantivyValue<TantivyDocument> for i8 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i16 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i32 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i64 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for u64 {
impl TantivyValue<TantivyDocument> for f32 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_u64(Field::from_field_id(field), *self);
document.add_f64(Field::from_field_id(field), *self as f64);
}
}
@ -146,15 +166,58 @@ impl IndexWriterWrapperImpl {
Ok(())
}
pub fn add<T: TantivyValue<TantivyDocument>>(
pub fn add_data_by_batch<T: TantivyValue<TantivyDocument>>(
&mut self,
data: T,
batch_data: &[T],
offset: Option<i64>,
) -> Result<()> {
let mut document = TantivyDocument::default();
data.add_to_document(self.field.field_id(), &mut document);
match &self.index_writer {
Either::Left(_) => self.add_datas(batch_data, offset.unwrap()),
Either::Right(_) => self.add_datas_by_single_segment(batch_data),
}
}
self.add_document(document, offset)
fn add_datas<T: TantivyValue<TantivyDocument>>(
&mut self,
batch_data: &[T],
offset_begin: i64,
) -> Result<()> {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
for (idx, data) in batch_data.into_iter().enumerate() {
let offset = offset_begin + idx as i64;
let mut doc = TantivyDocument::default();
data.add_to_document(self.field.field_id(), &mut doc);
doc.add_i64(id_field, offset);
batch.push(UserOperation::Add(doc));
if batch.len() == BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
if !batch.is_empty() {
writer.run(batch)?;
}
Ok(())
}
fn add_datas_by_single_segment<T: TantivyValue<TantivyDocument>>(
&mut self,
batch_data: &[T],
) -> Result<()> {
for d in batch_data {
let mut document = TantivyDocument::default();
d.add_to_document(self.field.field_id(), &mut document);
self.add_document(document, None)?;
}
Ok(())
}
pub fn add_array<T: TantivyValue<TantivyDocument>, I>(
@ -201,32 +264,25 @@ impl IndexWriterWrapperImpl {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
data.iter()
.enumerate()
.try_for_each(|(idx, key)| -> Result<()> {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let key_offset = offset + idx as i64;
batch.push(UserOperation::Add(doc!(
id_field => key_offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
Ok(())
})?;
for (idx, key) in data.into_iter().enumerate() {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let key_offset = offset + idx as i64;
batch.push(UserOperation::Add(doc!(
id_field => key_offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
if !batch.is_empty() {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
writer.run(batch)?;
}
Ok(())

View File

@ -43,15 +43,33 @@ fn schema_builder_add_field(
}
}
impl TantivyValue<TantivyDocument> for i8 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i16 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i32 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self as i64);
}
}
impl TantivyValue<TantivyDocument> for i64 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_i64(Field::from_field_id(field), *self);
}
}
impl TantivyValue<TantivyDocument> for u64 {
impl TantivyValue<TantivyDocument> for f32 {
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
document.add_u64(Field::from_field_id(field), *self);
document.add_f64(Field::from_field_id(field), *self as f64);
}
}
@ -152,15 +170,58 @@ impl IndexWriterWrapperImpl {
Ok(())
}
pub fn add<T: TantivyValue<TantivyDocument>>(
pub fn add_data_by_batch<T: TantivyValue<TantivyDocument>>(
&mut self,
data: T,
batch_data: &[T],
offset: Option<i64>,
) -> Result<()> {
let mut document = TantivyDocument::default();
data.add_to_document(self.field.field_id(), &mut document);
match &self.index_writer {
Either::Left(_) => self.add_datas(batch_data, offset.unwrap()),
Either::Right(_) => self.add_datas_by_single_segment(batch_data),
}
}
self.add_document(document, offset)
fn add_datas<T: TantivyValue<TantivyDocument>>(
&mut self,
batch_data: &[T],
offset_begin: i64,
) -> Result<()> {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
for (idx, data) in batch_data.into_iter().enumerate() {
let offset = offset_begin + idx as i64;
let mut doc = TantivyDocument::default();
data.add_to_document(self.field.field_id(), &mut doc);
doc.add_i64(id_field, offset);
batch.push(UserOperation::Add(doc));
if batch.len() == BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
if !batch.is_empty() {
writer.run(batch)?;
}
Ok(())
}
fn add_datas_by_single_segment<T: TantivyValue<TantivyDocument>>(
&mut self,
batch_data: &[T],
) -> Result<()> {
for d in batch_data {
let mut document = TantivyDocument::default();
d.add_to_document(self.field.field_id(), &mut document);
self.add_document(document, None)?;
}
Ok(())
}
pub fn add_array<T: TantivyValue<TantivyDocument>, I>(
@ -207,32 +268,25 @@ impl IndexWriterWrapperImpl {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
data.iter()
.enumerate()
.try_for_each(|(idx, key)| -> Result<()> {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let key_offset = offset + idx as i64;
batch.push(UserOperation::Add(doc!(
id_field => key_offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
Ok(())
})?;
for (idx, key) in data.into_iter().enumerate() {
let key = unsafe { CStr::from_ptr(*key) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let key_offset = offset + idx as i64;
batch.push(UserOperation::Add(doc!(
id_field => key_offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
if !batch.is_empty() {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
writer.run(batch)?;
}
Ok(())

View File

@ -1069,4 +1069,4 @@ TEST(TextMatch, ConcurrentReadWriteWithNull) {
writer.join();
reader.join();
}
}