diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index 38060c8cb3..6bd4ad2bc0 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -274,14 +274,11 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( boost::filesystem::create_directories(path_); std::string field_name = std::to_string(disk_file_manager_->GetFieldDataMeta().field_id); - d_type_ = TantivyDataType::Keyword; wrapper_ = std::make_shared( field_name.c_str(), - d_type_, path_.c_str(), tantivy_index_version, - false, - false, + false, /* in_ram */ 1, json_stats_tantivy_memory_budget); } @@ -291,9 +288,8 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( int64_t commit_interval_in_ms, const char* unique_id) : commit_interval_in_ms_(commit_interval_in_ms), last_commit_time_(stdclock::now()) { - d_type_ = TantivyDataType::Keyword; wrapper_ = std::make_shared( - unique_id, d_type_, "", TANTIVY_INDEX_LATEST_VERSION, false, true); + unique_id, "", TANTIVY_INDEX_LATEST_VERSION, true /* in_ram */); } JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( @@ -306,9 +302,8 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( boost::filesystem::path sub_path = unique_id; path_ = (prefix / sub_path).string(); boost::filesystem::create_directories(path_); - d_type_ = TantivyDataType::Keyword; wrapper_ = std::make_shared( - unique_id, d_type_, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION); + unique_id, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION); } IndexStatsPtr diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock index ddb8ae3608..696949d6a2 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock +++ b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock @@ -1782,7 +1782,7 @@ dependencies = [ [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "stable_deref_trait", ] @@ -2523,7 +2523,7 @@ dependencies = [ [[package]] name = "tantivy" version = "0.23.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "aho-corasick", "arc-swap", @@ -2609,7 +2609,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "bitpacking 0.9.2", ] @@ -2632,7 +2632,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "downcast-rs", "fastdivide", @@ -2659,7 +2659,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "async-trait", "byteorder", @@ -2702,7 +2702,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "nom", ] @@ -2720,7 +2720,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "futures-util", "itertools 0.14.0", @@ -2742,7 +2742,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "murmurhash32", "rand_distr", @@ -2760,7 +2760,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/zilliztech/tantivy.git#8abf9cee3eb6fe959c83e69a638c33fc3b830e88" +source = "git+https://github.com/zilliztech/tantivy.git#a85e23f79d1e7c96d2c3c8b5f5f0f0a3da50f402" dependencies = [ "serde", ] @@ -3245,7 +3245,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 5b6fb4f378..64cd5e04ce 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -103,6 +103,13 @@ void hashmap_set_value(void *map, const char *key, const char *value); void free_hashmap(void *map); +RustResult tantivy_create_json_key_stats_writer(const char *field_name, + const char *path, + uint32_t tantivy_index_version, + uintptr_t num_threads, + uintptr_t overall_memory_budget_in_bytes, + bool in_ram); + RustResult tantivy_load_index(const char *path); void tantivy_free_index_reader(void *ptr); @@ -182,8 +189,7 @@ RustResult tantivy_create_index(const char *field_name, const char *path, uint32_t tantivy_index_version, uintptr_t num_threads, - uintptr_t overall_memory_budget_in_bytes, - bool in_ram); + uintptr_t overall_memory_budget_in_bytes); RustResult tantivy_create_index_with_single_segment(const char *field_name, TantivyDataType data_type, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer.rs new file mode 100644 index 0000000000..a67eef10b5 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer.rs @@ -0,0 +1,35 @@ +use crate::error::Result; +use crate::index_writer::IndexWriterWrapper; +use crate::{index_writer_v5, index_writer_v7, TantivyIndexVersion}; + +impl IndexWriterWrapper { + pub fn create_json_key_stats_writer( + field_name: &str, + path: &str, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + tanviy_index_version: TantivyIndexVersion, + in_ram: bool, + ) -> Result { + match tanviy_index_version { + TantivyIndexVersion::V5 => Ok(IndexWriterWrapper::V5( + index_writer_v5::IndexWriterWrapperImpl::create_json_key_stats_writer( + field_name, + path, + num_threads, + overall_memory_budget_in_bytes, + in_ram, + )?, + )), + TantivyIndexVersion::V7 => Ok(IndexWriterWrapper::V7( + index_writer_v7::IndexWriterWrapperImpl::create_json_key_stats_writer( + field_name, + path, + num_threads, + overall_memory_budget_in_bytes, + in_ram, + )?, + )), + } + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer_c.rs new file mode 100644 index 0000000000..350fd843a9 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_json_key_stats_writer_c.rs @@ -0,0 +1,43 @@ +use std::ffi::c_char; +use std::ffi::CStr; + +use crate::array::RustResult; +use crate::cstr_to_str; +use crate::index_writer::IndexWriterWrapper; +use crate::log::init_log; +use crate::util::create_binding; +use crate::TantivyIndexVersion; + +#[no_mangle] +pub extern "C" fn tantivy_create_json_key_stats_writer( + field_name: *const c_char, + path: *const c_char, + tantivy_index_version: u32, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + in_ram: bool, +) -> RustResult { + init_log(); + let field_name_str = cstr_to_str!(field_name); + let path_str = cstr_to_str!(path); + + let tantivy_index_version = match TantivyIndexVersion::from_u32(tantivy_index_version) { + Ok(v) => v, + Err(e) => return RustResult::from_error(e.to_string()), + }; + + match IndexWriterWrapper::create_json_key_stats_writer( + field_name_str, + path_str, + num_threads, + overall_memory_budget_in_bytes, + tantivy_index_version, + in_ram, + ) { + Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)), + Err(err) => RustResult::from_error(format!( + "create json key stats writer failed with error: {}", + err.to_string(), + )), + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs index 3a5f07aa59..49301b257b 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs @@ -7,6 +7,7 @@ use tantivy::{Index, IndexReader, ReloadPolicy, Term}; use crate::docid_collector::DocIdCollector; use crate::log::init_log; +use crate::milvus_id_collector::MilvusIdCollector; use crate::util::make_bounds; use crate::vec_collector::VecCollector; @@ -19,6 +20,7 @@ pub(crate) struct IndexReaderWrapper { pub(crate) reader: IndexReader, pub(crate) index: Arc, pub(crate) id_field: Option, + pub(crate) user_specified_doc_id: bool, } impl IndexReaderWrapper { @@ -39,6 +41,8 @@ impl IndexReaderWrapper { Err(_) => None, }; + assert!(!schema.user_specified_doc_id() || id_field.is_none()); + let reader = index .reader_builder() .reload_policy(ReloadPolicy::OnCommitWithDelay) // OnCommitWithDelay serve for growing segment. @@ -51,6 +55,7 @@ impl IndexReaderWrapper { reader, index, id_field, + user_specified_doc_id: schema.user_specified_doc_id(), }) } @@ -63,7 +68,11 @@ impl IndexReaderWrapper { let metas = self.index.searchable_segment_metas()?; let mut sum: u32 = 0; for meta in metas { - sum += meta.max_doc(); + if self.user_specified_doc_id { + sum = std::cmp::max(sum, meta.max_doc()); + } else { + sum += meta.max_doc(); + } } Ok(sum) } @@ -78,16 +87,23 @@ impl IndexReaderWrapper { .map_err(TantivyBindingError::TantivyError) } None => { - // older version without doc_id, only one segment. - searcher - .search(q, &VecCollector {}) - .map_err(TantivyBindingError::TantivyError) + if self.user_specified_doc_id { + // newer version with user specified doc id. + searcher + .search(q, &MilvusIdCollector::default()) + .map_err(TantivyBindingError::TantivyError) + } else { + // older version without doc_id, only one segment. + searcher + .search(q, &VecCollector {}) + .map_err(TantivyBindingError::TantivyError) + } } } } // Generally, we should use [`crate::search`], except for some special senarios where the doc_id could beyound - // the score of u32. + // the scope of u32 such as json key stats offset. #[allow(dead_code)] pub(crate) fn search_i64(&self, q: &dyn Query) -> Result> { assert!(self.id_field.is_some()); @@ -298,7 +314,7 @@ mod test { use tantivy::{ doc, - schema::{Schema, STORED, STRING}, + schema::{Schema, STORED, STRING, TEXT_WITH_DOC_ID}, Index, }; @@ -326,4 +342,40 @@ mod test { res = index_reader_wrapper.prefix_query_keyword("$").unwrap(); assert_eq!(res.len(), 1); } + + #[test] + fn test_count() { + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("title", TEXT_WITH_DOC_ID); + schema_builder.enable_user_specified_doc_id(); + let schema = schema_builder.build(); + let title = schema.get_field("title").unwrap(); + + let index = Index::create_in_ram(schema.clone()); + let mut index_writer = index.writer(50000000).unwrap(); + + for i in 0..10_000 { + index_writer + .add_document_with_doc_id(i, doc!(title => format!("abc{}", i))) + .unwrap(); + } + index_writer.commit().unwrap(); + + let index_shared = Arc::new(index); + let index_reader_wrapper = IndexReaderWrapper::from_index(index_shared).unwrap(); + let count = index_reader_wrapper.count().unwrap(); + assert_eq!(count, 10000); + + let batch: Vec<_> = (0..10_000) + .into_iter() + .map(|i| doc!(title => format!("hello{}", i))) + .collect(); + index_writer + .add_documents_with_doc_id(10_000, batch) + .unwrap(); + index_writer.commit().unwrap(); + index_reader_wrapper.reload().unwrap(); + let count = index_reader_wrapper.count().unwrap(); + assert_eq!(count, 20000); + } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs index 5e321b3d79..1f6f7657f9 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs @@ -85,7 +85,9 @@ mod tests { .unwrap(); writer.add_data_by_batch(&["网球和滑雪"], Some(0)).unwrap(); - writer.add_data_by_batch(&["网球以及滑雪"], Some(1)).unwrap(); + writer + .add_data_by_batch(&["网球以及滑雪"], Some(1)) + .unwrap(); writer.commit().unwrap(); @@ -129,7 +131,5 @@ mod tests { let res = reader.search(&query).unwrap(); assert_eq!(res, (0..10000).collect::>()); - let res = reader.search_i64(&query).unwrap(); - assert_eq!(res, (0..10000).collect::>()); } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs index 7eb1e91c54..4156b72e7e 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs @@ -29,7 +29,6 @@ impl IndexWriterWrapper { num_threads: usize, overall_memory_budget_in_bytes: usize, tanviy_index_version: TantivyIndexVersion, - in_ram: bool, ) -> Result { init_log(); match tanviy_index_version { @@ -40,7 +39,6 @@ impl IndexWriterWrapper { path, num_threads, overall_memory_budget_in_bytes, - in_ram, )?; Ok(IndexWriterWrapper::V5(writer)) } @@ -51,7 +49,6 @@ impl IndexWriterWrapper { path, num_threads, overall_memory_budget_in_bytes, - in_ram, )?; Ok(IndexWriterWrapper::V7(writer)) } @@ -88,7 +85,9 @@ impl IndexWriterWrapper { { match self { IndexWriterWrapper::V5(writer) => writer.add_data_by_batch(data, offset), - IndexWriterWrapper::V7(writer) => writer.add_data_by_batch(data, offset.unwrap()), + IndexWriterWrapper::V7(writer) => { + writer.add_data_by_batch(data, offset.unwrap() as u32) + } } } @@ -99,7 +98,7 @@ impl IndexWriterWrapper { { match self { IndexWriterWrapper::V5(writer) => writer.add_array(data, offset), - IndexWriterWrapper::V7(writer) => writer.add_array(data, offset.unwrap()), + IndexWriterWrapper::V7(writer) => writer.add_array(data, offset.unwrap() as u32), } } @@ -110,7 +109,9 @@ impl IndexWriterWrapper { ) -> Result<()> { match self { IndexWriterWrapper::V5(writer) => writer.add_string_by_batch(data, offset), - IndexWriterWrapper::V7(writer) => writer.add_string_by_batch(data, offset.unwrap()), + IndexWriterWrapper::V7(writer) => { + writer.add_string_by_batch(data, offset.unwrap() as u32) + } } } @@ -121,7 +122,9 @@ impl IndexWriterWrapper { ) -> Result<()> { match self { IndexWriterWrapper::V5(writer) => writer.add_array_keywords(datas, offset), - IndexWriterWrapper::V7(writer) => writer.add_array_keywords(datas, offset.unwrap()), + IndexWriterWrapper::V7(writer) => { + writer.add_array_keywords(datas, offset.unwrap() as u32) + } } } @@ -193,7 +196,6 @@ mod tests { 1, 50_000_000, TantivyIndexVersion::V5, - false, ) .unwrap(); @@ -309,14 +311,12 @@ mod tests { #[test] pub fn test_add_json_key_stats() { - use crate::data_type::TantivyDataType; use crate::index_writer::IndexWriterWrapper; let temp_dir = tempdir().unwrap(); - let mut index_writer = IndexWriterWrapper::new( + let mut index_writer = IndexWriterWrapper::create_json_key_stats_writer( "test", - TantivyDataType::Keyword, - temp_dir.path().to_str().unwrap().to_string(), + temp_dir.path().to_str().unwrap(), 1, 15 * 1024 * 1024, TantivyIndexVersion::V7, @@ -352,4 +352,73 @@ mod tests { let count = index_writer.create_reader().unwrap().count().unwrap(); assert_eq!(count, total_count); } + + #[test] + pub fn test_add_strings_by_batch() { + use crate::data_type::TantivyDataType; + use crate::index_writer::IndexWriterWrapper; + + let temp_dir = tempdir().unwrap(); + let mut index_writer = IndexWriterWrapper::new( + "test", + TantivyDataType::Keyword, + temp_dir.path().to_str().unwrap().to_string(), + 1, + 15 * 1024 * 1024, + TantivyIndexVersion::V7, + ) + .unwrap(); + + let keys = (0..10000) + .map(|i| format!("key{:05}", i)) + .collect::>(); + + let c_keys: Vec = keys.into_iter().map(|k| CString::new(k).unwrap()).collect(); + let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect(); + + index_writer + .add_string_by_batch(&key_ptrs, Some(0)) + .unwrap(); + index_writer.commit().unwrap(); + let reader = index_writer.create_reader().unwrap(); + let count: u32 = reader.count().unwrap(); + assert_eq!(count, 10000); + } + + #[test] + pub fn test_add_data_by_batch() { + use crate::data_type::TantivyDataType; + use crate::index_writer::IndexWriterWrapper; + + let temp_dir = tempdir().unwrap(); + let mut index_writer = IndexWriterWrapper::new( + "test", + TantivyDataType::I64, + temp_dir.path().to_str().unwrap().to_string(), + 1, + 15 * 1024 * 1024, + TantivyIndexVersion::V7, + ) + .unwrap(); + + let keys = (0..10000).collect::>(); + + let mut count = 0; + for i in keys { + index_writer + .add_data_by_batch::(&[i], Some(i as i64)) + .unwrap(); + + count += 1; + + if count % 1000 == 0 { + index_writer.commit().unwrap(); + } + } + + index_writer.commit().unwrap(); + let reader = index_writer.create_reader().unwrap(); + let count: u32 = reader.count().unwrap(); + assert_eq!(count, 10000); + } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs index 969b73333c..dd739bc1d3 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs @@ -28,7 +28,6 @@ pub extern "C" fn tantivy_create_index( tantivy_index_version: u32, num_threads: usize, overall_memory_budget_in_bytes: usize, - in_ram : bool, ) -> RustResult { let field_name_str = cstr_to_str!(field_name); let path_str = cstr_to_str!(path); @@ -45,7 +44,6 @@ pub extern "C" fn tantivy_create_index( num_threads, overall_memory_budget_in_bytes, tantivy_index_version, - in_ram, ) { Ok(wrapper) => RustResult::from_ptr(create_binding(wrapper)), Err(e) => RustResult::from_error(e.to_string()), diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs index d4a524aee2..a10e4a4c2a 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs @@ -27,7 +27,7 @@ pub(crate) struct IndexWriterWrapperImpl { } #[inline] -fn schema_builder_add_field( +pub(crate) fn schema_builder_add_field( schema_builder: &mut SchemaBuilder, field_name: &str, data_type: TantivyDataType, @@ -50,48 +50,56 @@ fn schema_builder_add_field( } impl TantivyValue for i8 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i16 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i32 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i64 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self); } } impl TantivyValue for f32 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_f64(Field::from_field_id(field), *self as f64); } } impl TantivyValue for f64 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_f64(Field::from_field_id(field), *self); } } impl TantivyValue for &str { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_text(Field::from_field_id(field), *self); } } impl TantivyValue for bool { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_bool(Field::from_field_id(field), *self); } @@ -104,7 +112,6 @@ impl IndexWriterWrapperImpl { path: String, num_threads: usize, overall_memory_budget_in_bytes: usize, - in_ram: bool, ) -> Result { info!( "create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 5", @@ -115,11 +122,7 @@ impl IndexWriterWrapperImpl { // We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field. let id_field = schema_builder.add_i64_field("doc_id", FAST); let schema = schema_builder.build(); - let index = if in_ram { - Index::create_in_ram(schema) - } else { - Index::create_in_dir(path.clone(), schema)? - }; + let index = Index::create_in_dir(path.clone(), schema)?; let index_writer = index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?; Ok(IndexWriterWrapperImpl { diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_json_key_stats.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_json_key_stats.rs new file mode 100644 index 0000000000..fe8632d41e --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_json_key_stats.rs @@ -0,0 +1,43 @@ +use either::Either; +use log::info; +use std::sync::Arc; +use tantivy_5::{ + schema::{Schema, FAST}, + Index, +}; + +use crate::data_type::TantivyDataType; +use crate::error::Result; +use crate::index_writer_v5::index_writer::schema_builder_add_field; + +use super::IndexWriterWrapperImpl; + +impl IndexWriterWrapperImpl { + pub(crate) fn create_json_key_stats_writer( + field_name: &str, + path: &str, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + in_ram: bool, + ) -> Result { + info!("create json key stats writer, field_name: {}", field_name); + let mut schema_builder = Schema::builder(); + let field = + schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword); + let id_field = schema_builder.add_i64_field("doc_id", FAST); + let schema = schema_builder.build(); + let index = if in_ram { + Index::create_in_ram(schema) + } else { + Index::create_in_dir(path, schema)? + }; + let index_writer = + index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?; + Ok(IndexWriterWrapperImpl { + field, + index_writer: Either::Left(index_writer), + id_field: Some(id_field), + _index: Arc::new(index), + }) + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_text.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_text.rs index 38a34ceef5..5417351c07 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_text.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer_text.rs @@ -1,3 +1,4 @@ +use log::info; use std::sync::Arc; use either::Either; @@ -5,7 +6,6 @@ use tantivy_5::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, Tex use tantivy_5::Index; use crate::error::Result; -use crate::log::init_log; use super::analyzer::create_analyzer; use super::IndexWriterWrapperImpl; @@ -32,7 +32,10 @@ impl IndexWriterWrapperImpl { overall_memory_budget_in_bytes: usize, in_ram: bool, ) -> Result { - init_log(); + info!( + "create text index writer, field_name: {}, tantivy_index_version 5", + field_name + ); let tokenizer = create_analyzer(tokenizer_params)?; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/mod.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/mod.rs index f146ee496b..fe299d803c 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/mod.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/mod.rs @@ -5,6 +5,7 @@ mod analyzer; pub(crate) mod index_writer; +pub(crate) mod index_writer_json_key_stats; pub(crate) mod index_writer_text; pub(crate) use index_writer::IndexWriterWrapperImpl; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs index 6ffd0b07d6..c7f4be2258 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs @@ -6,7 +6,7 @@ use libc::c_char; use log::info; use tantivy::indexer::UserOperation; use tantivy::schema::{ - Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED, + Field, IndexRecordOption, NumericOptions, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, }; use tantivy::{doc, Index, IndexWriter, TantivyDocument}; @@ -19,18 +19,25 @@ use crate::index_writer::TantivyValue; const BATCH_SIZE: usize = 4096; #[inline] -fn schema_builder_add_field( +pub(crate) fn schema_builder_add_field( schema_builder: &mut SchemaBuilder, field_name: &str, data_type: TantivyDataType, ) -> Field { match data_type { - TantivyDataType::I64 => schema_builder.add_i64_field(field_name, INDEXED), - TantivyDataType::F64 => schema_builder.add_f64_field(field_name, INDEXED), - TantivyDataType::Bool => schema_builder.add_bool_field(field_name, INDEXED), + TantivyDataType::I64 => { + schema_builder.add_i64_field(field_name, NumericOptions::default().set_indexed()) + } + TantivyDataType::F64 => { + schema_builder.add_f64_field(field_name, NumericOptions::default().set_indexed()) + } + TantivyDataType::Bool => { + schema_builder.add_bool_field(field_name, NumericOptions::default().set_indexed()) + } TantivyDataType::Keyword => { let text_field_indexing = TextFieldIndexing::default() .set_tokenizer("raw") + .set_fieldnorms(false) .set_index_option(IndexRecordOption::Basic); let text_options = TextOptions::default().set_indexing_options(text_field_indexing); schema_builder.add_text_field(field_name, text_options) @@ -42,48 +49,56 @@ fn schema_builder_add_field( } impl TantivyValue for i8 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i16 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i32 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self as i64); } } impl TantivyValue for i64 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_i64(Field::from_field_id(field), *self); } } impl TantivyValue for f32 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_f64(Field::from_field_id(field), *self as f64); } } impl TantivyValue for f64 { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_f64(Field::from_field_id(field), *self); } } impl TantivyValue for &str { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_text(Field::from_field_id(field), *self); } } impl TantivyValue for bool { + #[inline] fn add_to_document(&self, field: u32, document: &mut TantivyDocument) { document.add_bool(Field::from_field_id(field), *self); } @@ -92,7 +107,6 @@ impl TantivyValue for bool { pub struct IndexWriterWrapperImpl { pub(crate) field: Field, pub(crate) index_writer: IndexWriter, - pub(crate) id_field: Field, pub(crate) index: Arc, } @@ -103,7 +117,6 @@ impl IndexWriterWrapperImpl { path: String, num_threads: usize, overall_memory_budget_in_bytes: usize, - in_ram: bool, ) -> Result { info!( "create index writer, field_name: {}, data_type: {:?}, tantivy_index_version 7", @@ -111,20 +124,14 @@ impl IndexWriterWrapperImpl { ); let mut schema_builder = Schema::builder(); let field = schema_builder_add_field(&mut schema_builder, field_name, data_type); - // We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field. - let id_field = schema_builder.add_i64_field("doc_id", FAST); + schema_builder.enable_user_specified_doc_id(); let schema = schema_builder.build(); - let index = if in_ram { - Index::create_in_ram(schema) - } else { - Index::create_in_dir(path.clone(), schema)? - }; + let index = Index::create_in_dir(path.clone(), schema)?; let index_writer = index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?; Ok(IndexWriterWrapperImpl { field, index_writer, - id_field, index: Arc::new(index), }) } @@ -134,36 +141,34 @@ impl IndexWriterWrapperImpl { } #[inline] - fn add_document(&mut self, mut document: TantivyDocument, offset: i64) -> Result<()> { - document.add_i64(self.id_field, offset); - self.index_writer.add_document(document)?; + fn add_document(&mut self, document: TantivyDocument, offset: u32) -> Result<()> { + self.index_writer + .add_document_with_doc_id(offset, document)?; Ok(()) } pub fn add_data_by_batch>( &mut self, batch_data: &[T], - offset_begin: i64, + mut offset: u32, ) -> Result<()> { let mut batch = Vec::with_capacity(BATCH_SIZE); - for (idx, data) in batch_data.into_iter().enumerate() { - let offset = offset_begin + idx as i64; - + for data in batch_data.into_iter() { let mut doc = TantivyDocument::default(); data.add_to_document(self.field.field_id(), &mut doc); - doc.add_i64(self.id_field, offset); - batch.push(UserOperation::Add(doc)); + batch.push(doc); if batch.len() == BATCH_SIZE { - self.index_writer.run(std::mem::replace( - &mut batch, - Vec::with_capacity(BATCH_SIZE), - ))?; + self.index_writer.add_documents_with_doc_id( + offset, + std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE)), + )?; + offset += BATCH_SIZE as u32; } } if !batch.is_empty() { - self.index_writer.run(batch)?; + self.index_writer.add_documents_with_doc_id(offset, batch)?; } Ok(()) @@ -172,7 +177,7 @@ impl IndexWriterWrapperImpl { pub fn add_array, I>( &mut self, data: I, - offset: i64, + offset: u32, ) -> Result<()> where I: IntoIterator, @@ -184,7 +189,7 @@ impl IndexWriterWrapperImpl { self.add_document(document, offset) } - pub fn add_array_keywords(&mut self, datas: &[*const c_char], offset: i64) -> Result<()> { + pub fn add_array_keywords(&mut self, datas: &[*const c_char], offset: u32) -> Result<()> { let mut document = TantivyDocument::default(); for element in datas { let data = unsafe { CStr::from_ptr(*element) }; @@ -194,27 +199,26 @@ impl IndexWriterWrapperImpl { self.add_document(document, offset) } - pub fn add_string_by_batch(&mut self, data: &[*const c_char], offset: i64) -> Result<()> { + pub fn add_string_by_batch(&mut self, data: &[*const c_char], mut offset: u32) -> Result<()> { let mut batch = Vec::with_capacity(BATCH_SIZE); - for (idx, key) in data.into_iter().enumerate() { + for key in data.into_iter() { let key = unsafe { CStr::from_ptr(*key) } .to_str() .map_err(|e| TantivyBindingError::InternalError(e.to_string()))?; - let key_offset = offset + idx as i64; - batch.push(UserOperation::Add(doc!( - self.id_field => key_offset, + batch.push(doc!( self.field => key, - ))); - if batch.len() >= BATCH_SIZE { - self.index_writer.run(std::mem::replace( - &mut batch, - Vec::with_capacity(BATCH_SIZE), - ))?; + )); + if batch.len() == BATCH_SIZE { + self.index_writer.add_documents_with_doc_id( + offset, + std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE)), + )?; + offset += BATCH_SIZE as u32; } } if !batch.is_empty() { - self.index_writer.run(batch)?; + self.index_writer.add_documents_with_doc_id(offset, batch)?; } Ok(()) @@ -227,6 +231,12 @@ impl IndexWriterWrapperImpl { json_offsets_len: &[usize], ) -> Result<()> { let mut batch = Vec::with_capacity(BATCH_SIZE); + let id_field = self + .index_writer + .index() + .schema() + .get_field("doc_id") + .unwrap(); for i in 0..keys.len() { let key = unsafe { CStr::from_ptr(keys[i]) } .to_str() @@ -237,7 +247,7 @@ impl IndexWriterWrapperImpl { for offset in offsets { batch.push(UserOperation::Add(doc!( - self.id_field => *offset, + id_field => *offset, self.field => key, ))); @@ -280,45 +290,3 @@ impl IndexWriterWrapperImpl { Ok(()) } } - -#[cfg(test)] -mod tests { - use std::ffi::CString; - - use tempfile::tempdir; - - use crate::TantivyIndexVersion; - - #[test] - pub fn test_add_json_key_stats() { - use crate::data_type::TantivyDataType; - use crate::index_writer::IndexWriterWrapper; - - let temp_dir = tempdir().unwrap(); - let mut index_writer = IndexWriterWrapper::new( - "test", - TantivyDataType::Keyword, - temp_dir.path().to_str().unwrap().to_string(), - 1, - 15 * 1024 * 1024, - TantivyIndexVersion::V7, - false, - ) - .unwrap(); - - let keys = (0..10000) - .map(|i| format!("key{:05}", i)) - .collect::>(); - - let c_keys: Vec = keys.into_iter().map(|k| CString::new(k).unwrap()).collect(); - let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect(); - - index_writer - .add_string_by_batch(&key_ptrs, Some(0)) - .unwrap(); - index_writer.commit().unwrap(); - let reader = index_writer.create_reader().unwrap(); - let count: u32 = reader.count().unwrap(); - assert_eq!(count, 10000); - } -} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_json_key_stats.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_json_key_stats.rs new file mode 100644 index 0000000000..6f501a35c1 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_json_key_stats.rs @@ -0,0 +1,42 @@ +use log::info; +use std::sync::Arc; +use tantivy::{ + schema::{Schema, FAST}, + Index, +}; + +use crate::{ + data_type::TantivyDataType, error::Result, + index_writer_v7::index_writer::schema_builder_add_field, +}; + +use super::IndexWriterWrapperImpl; + +impl IndexWriterWrapperImpl { + pub(crate) fn create_json_key_stats_writer( + field_name: &str, + path: &str, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + in_ram: bool, + ) -> Result { + info!("create json key stats writer, field_name: {}", field_name); + let mut schema_builder = Schema::builder(); + let field = + schema_builder_add_field(&mut schema_builder, field_name, TantivyDataType::Keyword); + let _ = schema_builder.add_i64_field("doc_id", FAST); + let schema = schema_builder.build(); + let index = if in_ram { + Index::create_in_ram(schema) + } else { + Index::create_in_dir(path, schema)? + }; + let index_writer = + index.writer_with_num_threads(num_threads, overall_memory_budget_in_bytes)?; + Ok(IndexWriterWrapperImpl { + field, + index_writer, + index: Arc::new(index), + }) + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_text.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_text.rs index 569df94bf0..d109d19dfd 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_text.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer_text.rs @@ -1,24 +1,25 @@ +use log::info; use std::sync::Arc; -use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, FAST}; +use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions}; use tantivy::Index; use crate::analyzer::create_analyzer; use crate::error::Result; -use crate::log::init_log; use super::IndexWriterWrapperImpl; -fn build_text_schema(field_name: &str, tokenizer_name: &str) -> (Schema, Field, Field) { +fn build_text_schema(field_name: &str, tokenizer_name: &str) -> (Schema, Field) { let mut schema_builder = Schema::builder(); // positions is required for matching phase. let indexing = TextFieldIndexing::default() .set_tokenizer(tokenizer_name) + .set_fieldnorms(false) .set_index_option(IndexRecordOption::WithFreqsAndPositions); let option = TextOptions::default().set_indexing_options(indexing); let field = schema_builder.add_text_field(field_name, option); - let id_field = schema_builder.add_i64_field("doc_id", FAST); - (schema_builder.build(), field, id_field) + schema_builder.enable_user_specified_doc_id(); + (schema_builder.build(), field) } impl IndexWriterWrapperImpl { @@ -31,10 +32,14 @@ impl IndexWriterWrapperImpl { overall_memory_budget_in_bytes: usize, in_ram: bool, ) -> Result { - init_log(); + info!( + "create text index writer, field_name: {}, tantivy_index_version 7", + field_name + ); + let tokenizer = create_analyzer(tokenizer_params)?; - let (schema, field, id_field) = build_text_schema(field_name, tokenizer_name); + let (schema, field) = build_text_schema(field_name, tokenizer_name); let index = if in_ram { Index::create_in_ram(schema) } else { @@ -48,7 +53,6 @@ impl IndexWriterWrapperImpl { Ok(IndexWriterWrapperImpl { field, index_writer, - id_field, index: Arc::new(index), }) } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/mod.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/mod.rs index 7122a00164..77e8361af4 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/mod.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/mod.rs @@ -3,6 +3,7 @@ //! in most cases. pub(crate) mod index_writer; +pub(crate) mod index_writer_json_key_stats; pub(crate) mod index_writer_text; pub(crate) use index_writer::IndexWriterWrapperImpl; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs index dfb06b77a3..ee3e66cd6f 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs @@ -7,6 +7,8 @@ mod demo_c; mod docid_collector; mod error; mod hashmap_c; +mod index_json_key_stats_writer; +mod index_json_key_stats_writer_c; mod index_reader; mod index_reader_c; mod index_reader_text; @@ -18,6 +20,7 @@ mod index_writer_text_c; mod index_writer_v5; mod index_writer_v7; mod log; +mod milvus_id_collector; mod string_c; mod token_stream_c; mod tokenizer_c; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/milvus_id_collector.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/milvus_id_collector.rs new file mode 100644 index 0000000000..2539c727b4 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/milvus_id_collector.rs @@ -0,0 +1,63 @@ +use tantivy::{ + collector::{Collector, SegmentCollector}, + DocId, Score, SegmentOrdinal, SegmentReader, +}; + +#[derive(Default)] +pub(crate) struct MilvusIdCollector {} + +pub(crate) struct MilvusIdChildCollector { + milvus_doc_ids: Vec, +} + +impl Collector for MilvusIdCollector { + type Fruit = Vec; + type Child = MilvusIdChildCollector; + + fn for_segment( + &self, + _segment_local_id: SegmentOrdinal, + _segment: &SegmentReader, + ) -> tantivy::Result { + Ok(MilvusIdChildCollector { + milvus_doc_ids: Vec::new(), + }) + } + + fn requires_scoring(&self) -> bool { + false + } + + fn merge_fruits( + &self, + segment_fruits: Vec<::Fruit>, + ) -> tantivy::Result { + let len: usize = segment_fruits.iter().map(|docset| docset.len()).sum(); + let mut result = Vec::with_capacity(len); + for docs in segment_fruits { + for doc in docs { + result.push(doc); + } + } + Ok(result) + } +} + +impl SegmentCollector for MilvusIdChildCollector { + type Fruit = Vec; + + #[inline] + fn collect_block(&mut self, docs: &[DocId]) { + self.milvus_doc_ids.extend(docs); + } + + fn collect(&mut self, doc: DocId, _score: Score) { + // Unreachable code actually + self.collect_block(&[doc]); + } + + #[inline] + fn harvest(self) -> Self::Fruit { + self.milvus_doc_ids + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index d6b0fd0875..14ea6a402f 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -84,7 +84,6 @@ struct TantivyIndexWrapper { const char* path, uint32_t tantivy_index_version, bool inverted_single_semgnent = false, - bool in_ram = false, uintptr_t num_threads = DEFAULT_NUM_THREADS, uintptr_t overall_memory_budget_in_bytes = DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) { @@ -102,8 +101,7 @@ struct TantivyIndexWrapper { path, tantivy_index_version, num_threads, - overall_memory_budget_in_bytes, - in_ram)); + overall_memory_budget_in_bytes)); } AssertInfo(res.result_->success, "failed to create index: {}", @@ -148,6 +146,29 @@ struct TantivyIndexWrapper { writer_ = res.result_->value.ptr._0; path_ = std::string(path); } + + // create index writer for json key stats + TantivyIndexWrapper(const char* field_name, + const char* path, + uint32_t tantivy_index_version, + bool in_ram = false, + uintptr_t num_threads = DEFAULT_NUM_THREADS, + uintptr_t overall_memory_budget_in_bytes = + DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) { + auto res = RustResultWrapper( + tantivy_create_json_key_stats_writer(field_name, + path, + tantivy_index_version, + num_threads, + overall_memory_budget_in_bytes, + in_ram)); + AssertInfo(res.result_->success, + "failed to create text writer: {}", + res.result_->error); + writer_ = res.result_->value.ptr._0; + path_ = std::string(path); + } + // create reader. void create_reader() {