fix: be able to handle tantivy indexes built from multiple segments (#39127)

fix: https://github.com/milvus-io/milvus/issues/39126

---------

Signed-off-by: SpadeA-Tang <tangchenjie1210@gmail.com>
This commit is contained in:
Spade A 2025-01-14 18:11:06 +08:00 committed by GitHub
parent 3474c5962f
commit e7e207360f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 200 additions and 31 deletions

View File

@ -993,6 +993,7 @@ dependencies = [
"regex",
"scopeguard",
"tantivy",
"tempfile",
"zstd-sys",
]

View File

@ -15,6 +15,9 @@ env_logger = "0.11.3"
log = "0.4.21"
regex = "1.11.1"
[dev-dependencies]
tempfile = "3.0"
[build-dependencies]
cbindgen = "0.26.0"

View File

@ -23,6 +23,8 @@ extern "C" {
void free_rust_array(RustArray array);
void print_vector_of_strings(const char *const *ptr, uintptr_t len);
void *tantivy_load_index(const char *path);
void tantivy_free_index_reader(void *ptr);
@ -115,6 +117,4 @@ void tantivy_index_add_multi_keywords(void *ptr, const char *const *array, uintp
bool tantivy_index_exist(const char *path);
void print_vector_of_strings(const char *const *ptr, uintptr_t len);
} // extern "C"

View File

@ -0,0 +1,53 @@
use tantivy::{
collector::{Collector, SegmentCollector},
fastfield::Column,
DocId, Score, SegmentOrdinal, SegmentReader,
};
pub(crate) struct DocIdCollector;
impl Collector for DocIdCollector {
type Fruit = Vec<DocId>;
type Child = DocIdChildCollector;
fn for_segment(
&self,
segment_local_id: SegmentOrdinal,
segment: &SegmentReader,
) -> tantivy::Result<Self::Child> {
Ok(DocIdChildCollector {
docs: Vec::new(),
column: segment.fast_fields().i64("doc_id").unwrap(),
})
}
fn requires_scoring(&self) -> bool {
false
}
fn merge_fruits(
&self,
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> tantivy::Result<Self::Fruit> {
Ok(segment_fruits.into_iter().flatten().collect::<Vec<_>>())
}
}
pub(crate) struct DocIdChildCollector {
docs: Vec<u32>,
column: Column<i64>,
}
impl SegmentCollector for DocIdChildCollector {
type Fruit = Vec<u32>;
fn collect(&mut self, doc: DocId, score: Score) {
self.column
.values_for_doc(doc)
.for_each(|doc_id| self.docs.push(doc_id as u32));
}
fn harvest(self) -> Self::Fruit {
self.docs
}
}

View File

@ -1,62 +1,82 @@
use std::ops::Bound;
use std::str::FromStr;
use std::sync::Arc;
use tantivy::directory::MmapDirectory;
use tantivy::query::{Query, RangeQuery, RegexQuery, TermQuery};
use tantivy::schema::{Field, IndexRecordOption};
use tantivy::{Index, IndexReader, ReloadPolicy, Term};
use crate::docid_collector::DocIdCollector;
use crate::log::init_log;
use crate::util::make_bounds;
use crate::vec_collector::VecCollector;
pub struct IndexReaderWrapper {
pub field_name: String,
pub field: Field,
pub reader: IndexReader,
pub cnt: u32,
pub(crate) struct IndexReaderWrapper {
pub(crate) field_name: String,
pub(crate) field: Field,
pub(crate) reader: IndexReader,
pub(crate) index: Arc<Index>,
pub(crate) id_field: Option<Field>,
}
impl IndexReaderWrapper {
pub fn new(index: &Index, field_name: &String, field: Field) -> IndexReaderWrapper {
pub fn load(path: &str) -> IndexReaderWrapper {
init_log();
let index = Index::open_in_dir(path).unwrap();
IndexReaderWrapper::from_index(Arc::new(index))
}
pub fn from_index(index: Arc<Index>) -> IndexReaderWrapper {
let schema = index.schema();
let field = schema.fields().next().unwrap().0;
let field_name = String::from(schema.get_field_name(field));
let id_field = schema.get_field("doc_id").ok();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.reload_policy(ReloadPolicy::OnCommit)
.try_into()
.unwrap();
let metas = index.searchable_segment_metas().unwrap();
let mut sum: u32 = 0;
for meta in metas {
sum += meta.max_doc();
}
reader.reload().unwrap();
IndexReaderWrapper {
field_name: field_name.to_string(),
field_name,
field,
reader,
cnt: sum,
index,
id_field,
}
}
pub fn load(path: &str) -> IndexReaderWrapper {
let dir = MmapDirectory::open(path).unwrap();
let index = Index::open(dir).unwrap();
let field = index.schema().fields().next().unwrap().0;
let schema = index.schema();
let field_name = schema.get_field_name(field);
IndexReaderWrapper::new(&index, &String::from_str(field_name).unwrap(), field)
pub fn reload(&self) {
self.reader.reload().unwrap();
}
pub fn count(&self) -> u32 {
self.cnt
self.index
.searchable_segment_metas()
.unwrap()
.iter()
.map(|meta| meta.max_doc())
.sum()
}
fn search(&self, q: &dyn Query) -> Vec<u32> {
pub(crate) fn search(&self, q: &dyn Query) -> Vec<u32> {
let searcher = self.reader.searcher();
let hits = searcher.search(q, &VecCollector).unwrap();
hits
// We set `id_field` when "doc_id" exists in schema which means
// there may be multiple segments within the index. If only one segment is allowed
// in the index, we can trivially know which rows of milvus relate to which
// docs in tantivy. So `VecCollector` is enough in such casese.
// When more than one segments are allowed, we use doc_id field to build the relationship
// between milvus and tantivy. In this case, we use `DocIdCollector`.
match self.id_field {
Some(_) => searcher.search(q, &DocIdCollector {}).unwrap(),
None => searcher.search(q, &VecCollector {}).unwrap(),
}
}
pub fn term_query_i64(&self, term: i64) -> Vec<u32> {
@ -198,12 +218,17 @@ impl IndexReaderWrapper {
#[cfg(test)]
mod test {
use std::sync::Arc;
use std::{
ops::{Bound, Range},
sync::Arc,
};
use tempfile::Builder;
use tantivy::{
doc,
schema::{self, Schema, STORED, STRING, TEXT},
Index, IndexWriter,
query::RangeQuery,
schema::{self, Schema, FAST, INDEXED, STORED, STRING, TEXT},
Index, IndexWriter, SingleSegmentIndexWriter, Term,
};
use super::IndexReaderWrapper;
@ -230,4 +255,90 @@ mod test {
res = index_reader_wrapper.prefix_query_keyword("$");
assert_eq!(res.len(), 1);
}
#[test]
fn test_load_without_doc_id() {
let temp_dir = Builder::new()
.prefix("test_index_load_without_doc_id")
.tempdir()
.unwrap();
{
let mut schema_builder = Schema::builder();
let f64_field = schema_builder.add_f64_field("f64", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_dir(&temp_dir, schema).unwrap();
let mut index_writer = SingleSegmentIndexWriter::new(index, 100_000_000).unwrap();
for i in 0..4 {
for j in 0..10 {
let n = i * 10 + j;
index_writer
.add_document(doc!(
f64_field => n as f64,
))
.unwrap();
}
}
index_writer.finalize().unwrap();
}
let index = IndexReaderWrapper::load(temp_dir.path().to_str().unwrap());
let query = RangeQuery::new_f64(
index.field_name.clone(),
Range {
start: 0.0,
end: 101.0,
},
);
let mut res = index.search(&query);
res.sort();
let expected = (0..40).collect::<Vec<u32>>();
assert_eq!(res, expected);
}
#[test]
fn test_load_with_doc_id() {
let temp_dir = Builder::new()
.prefix("test_index_load_with_doc_id")
.tempdir()
.unwrap();
{
let mut schema_builder = Schema::builder();
let f64_field = schema_builder.add_f64_field("f64", INDEXED);
let dock_id_field = schema_builder.add_i64_field("doc_id", FAST);
let schema = schema_builder.build();
let index = Index::create_in_dir(&temp_dir, schema).unwrap();
let mut index_writer = index.writer_with_num_threads(4, 100_000_000).unwrap();
for i in 0..4 {
for j in 0..10 {
let n = i * 10 + j;
index_writer
.add_document(doc!(
f64_field => n as f64,
dock_id_field => n as i64,
))
.unwrap();
}
index_writer.commit().unwrap();
}
}
let index = IndexReaderWrapper::load(temp_dir.path().to_str().unwrap());
let query = RangeQuery::new_f64(
index.field_name.clone(),
Range {
start: 0.0,
end: 101.0,
},
);
let mut res = index.search(&query);
res.sort();
let expected = (0..40).collect::<Vec<u32>>();
assert_eq!(res, expected);
}
}

View File

@ -1,5 +1,7 @@
mod array;
mod data_type;
mod demo_c;
mod docid_collector;
mod hashset_collector;
mod index_reader;
mod index_reader_c;
@ -10,7 +12,6 @@ mod log;
mod util;
mod util_c;
mod vec_collector;
mod demo_c;
pub fn add(left: usize, right: usize) -> usize {
left + right