enhance: Make user specified doc id selectable for tantivy index writer (#41528)

issue: https://github.com/milvus-io/milvus/issues/41527

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-05-07 10:48:53 +08:00 committed by GitHub
parent 18625d7d20
commit 0dee3ccfd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 63 additions and 15 deletions

View File

@ -215,7 +215,8 @@ RustResult tantivy_create_index(const char *field_name,
const char *path, const char *path,
uint32_t tantivy_index_version, uint32_t tantivy_index_version,
uintptr_t num_threads, uintptr_t num_threads,
uintptr_t overall_memory_budget_in_bytes); uintptr_t overall_memory_budget_in_bytes,
bool enable_user_specified_doc_id);
RustResult tantivy_create_index_with_single_segment(const char *field_name, RustResult tantivy_create_index_with_single_segment(const char *field_name,
TantivyDataType data_type, TantivyDataType data_type,

View File

@ -30,6 +30,7 @@ impl IndexWriterWrapper {
num_threads: usize, num_threads: usize,
overall_memory_budget_in_bytes: usize, overall_memory_budget_in_bytes: usize,
tanviy_index_version: TantivyIndexVersion, tanviy_index_version: TantivyIndexVersion,
enable_user_specified_doc_id: bool,
) -> Result<IndexWriterWrapper> { ) -> Result<IndexWriterWrapper> {
init_log(); init_log();
match tanviy_index_version { match tanviy_index_version {
@ -50,6 +51,7 @@ impl IndexWriterWrapper {
path, path,
num_threads, num_threads,
overall_memory_budget_in_bytes, overall_memory_budget_in_bytes,
enable_user_specified_doc_id,
)?; )?;
Ok(IndexWriterWrapper::V7(writer)) Ok(IndexWriterWrapper::V7(writer))
} }
@ -182,6 +184,7 @@ mod tests {
1, 1,
50_000_000, 50_000_000,
TantivyIndexVersion::V5, TantivyIndexVersion::V5,
false,
) )
.unwrap(); .unwrap();
@ -339,4 +342,29 @@ mod tests {
.unwrap(); .unwrap();
assert_eq!(count, total_count); assert_eq!(count, total_count);
} }
#[test]
fn test_control_user_specified_doc_id() {
let enabled = [true, false];
for enable in enabled {
let dir = TempDir::new().unwrap();
let mut index_wrapper = IndexWriterWrapper::new(
"test",
TantivyDataType::I64,
dir.path().to_str().unwrap().to_string(),
1,
100_000_000,
TantivyIndexVersion::V7,
enable,
)
.unwrap();
index_wrapper.add(1 as i64, Some(0)).unwrap();
index_wrapper.commit().unwrap();
let reader = index_wrapper.create_reader(set_bitset).unwrap();
let count = reader.count().unwrap();
assert_eq!(count, 1);
}
}
} }

View File

@ -30,6 +30,7 @@ pub extern "C" fn tantivy_create_index(
tantivy_index_version: u32, tantivy_index_version: u32,
num_threads: usize, num_threads: usize,
overall_memory_budget_in_bytes: usize, overall_memory_budget_in_bytes: usize,
enable_user_specified_doc_id: bool,
) -> RustResult { ) -> RustResult {
let field_name_str = cstr_to_str!(field_name); let field_name_str = cstr_to_str!(field_name);
let path_str = cstr_to_str!(path); let path_str = cstr_to_str!(path);
@ -46,6 +47,7 @@ pub extern "C" fn tantivy_create_index(
num_threads, num_threads,
overall_memory_budget_in_bytes, overall_memory_budget_in_bytes,
tantivy_index_version, tantivy_index_version,
enable_user_specified_doc_id,
) { ) {
Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)), Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)),
Err(e) => RustResult::from_error(e.to_string()), Err(e) => RustResult::from_error(e.to_string()),

View File

@ -6,7 +6,8 @@ use libc::c_char;
use log::info; use log::info;
use tantivy::indexer::UserOperation; use tantivy::indexer::UserOperation;
use tantivy::schema::{ use tantivy::schema::{
Field, IndexRecordOption, NumericOptions, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, Field, IndexRecordOption, NumericOptions, Schema, SchemaBuilder, TextFieldIndexing,
TextOptions, FAST,
}; };
use tantivy::{doc, Index, IndexWriter, TantivyDocument}; use tantivy::{doc, Index, IndexWriter, TantivyDocument};
@ -87,6 +88,8 @@ pub struct IndexWriterWrapperImpl {
pub(crate) field: Field, pub(crate) field: Field,
pub(crate) index_writer: IndexWriter, pub(crate) index_writer: IndexWriter,
pub(crate) index: Arc<Index>, pub(crate) index: Arc<Index>,
pub(crate) id_field: Option<Field>,
pub(crate) enable_user_specified_doc_id: bool,
} }
impl IndexWriterWrapperImpl { impl IndexWriterWrapperImpl {
@ -96,6 +99,7 @@ impl IndexWriterWrapperImpl {
path: String, path: String,
num_threads: usize, num_threads: usize,
overall_memory_budget_in_bytes: usize, overall_memory_budget_in_bytes: usize,
enable_user_specified_doc_id: bool,
) -> Result<IndexWriterWrapperImpl> { ) -> Result<IndexWriterWrapperImpl> {
info!( info!(
"create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 7", "create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 7",
@ -103,7 +107,12 @@ impl IndexWriterWrapperImpl {
); );
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, field_name, data_type); let field = schema_builder_add_field(&mut schema_builder, field_name, data_type);
let id_field = if enable_user_specified_doc_id {
schema_builder.enable_user_specified_doc_id(); schema_builder.enable_user_specified_doc_id();
None
} else {
Some(schema_builder.add_i64_field("doc_id", FAST))
};
let schema = schema_builder.build(); let schema = schema_builder.build();
let index = Index::create_in_dir(path.clone(), schema)?; let index = Index::create_in_dir(path.clone(), schema)?;
let index_writer = let index_writer =
@ -112,6 +121,8 @@ impl IndexWriterWrapperImpl {
field, field,
index_writer, index_writer,
index: Arc::new(index), index: Arc::new(index),
id_field,
enable_user_specified_doc_id,
}) })
} }
@ -120,9 +131,14 @@ impl IndexWriterWrapperImpl {
} }
#[inline] #[inline]
fn add_document(&mut self, document: TantivyDocument, offset: u32) -> Result<()> { fn add_document(&mut self, mut document: TantivyDocument, offset: u32) -> Result<()> {
if self.enable_user_specified_doc_id {
self.index_writer self.index_writer
.add_document_with_doc_id(offset, document)?; .add_document_with_doc_id(offset as u32, document)?;
} else {
document.add_i64(self.id_field.unwrap(), offset as i64);
self.index_writer.add_document(document)?;
}
Ok(()) Ok(())
} }
@ -165,12 +181,7 @@ impl IndexWriterWrapperImpl {
json_offsets_len: &[usize], json_offsets_len: &[usize],
) -> Result<()> { ) -> Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE); let mut batch = Vec::with_capacity(BATCH_SIZE);
let id_field = self let id_field = self.id_field.unwrap();
.index_writer
.index()
.schema()
.get_field("doc_id")
.unwrap();
for i in 0..keys.len() { for i in 0..keys.len() {
let key = unsafe { CStr::from_ptr(keys[i]) } let key = unsafe { CStr::from_ptr(keys[i]) }
.to_str() .to_str()

View File

@ -24,7 +24,7 @@ impl IndexWriterWrapperImpl {
let mut schema_builder = Schema::builder(); let mut schema_builder = Schema::builder();
let field = let field =
schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword); schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword);
let _ = schema_builder.add_i64_field("doc_id", FAST); let id_field = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build(); let schema = schema_builder.build();
let index = if in_ram { let index = if in_ram {
Index::create_in_ram(schema) Index::create_in_ram(schema)
@ -37,6 +37,8 @@ impl IndexWriterWrapperImpl {
field, field,
index_writer, index_writer,
index: Arc::new(index), index: Arc::new(index),
id_field: Some(id_field),
enable_user_specified_doc_id: false,
}) })
} }
} }

View File

@ -54,6 +54,8 @@ impl IndexWriterWrapperImpl {
field, field,
index_writer, index_writer,
index: Arc::new(index), index: Arc::new(index),
id_field: None,
enable_user_specified_doc_id: true,
}) })
} }
} }

View File

@ -87,7 +87,8 @@ struct TantivyIndexWrapper {
bool inverted_single_semgnent = false, bool inverted_single_semgnent = false,
uintptr_t num_threads = DEFAULT_NUM_THREADS, uintptr_t num_threads = DEFAULT_NUM_THREADS,
uintptr_t overall_memory_budget_in_bytes = uintptr_t overall_memory_budget_in_bytes =
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) { DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES,
bool enable_user_specified_doc_id = true) {
RustResultWrapper res; RustResultWrapper res;
if (inverted_single_semgnent) { if (inverted_single_semgnent) {
AssertInfo(tantivy_index_version == 5, AssertInfo(tantivy_index_version == 5,
@ -102,7 +103,8 @@ struct TantivyIndexWrapper {
path, path,
tantivy_index_version, tantivy_index_version,
num_threads, num_threads,
overall_memory_budget_in_bytes)); overall_memory_budget_in_bytes,
enable_user_specified_doc_id));
} }
AssertInfo(res.result_->success, AssertInfo(res.result_->success,
"failed to create index: {}", "failed to create index: {}",