diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 522193277c..5434bef4c6 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -540,7 +540,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context, }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { + retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{ auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index ebeaa14fdc..e02727c59c 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -219,7 +219,7 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx, std::vector null_offset_files; std::shared_ptr null_offset_data; - auto find_file = [&](const std::string& target) -> auto { + auto find_file = [&](const std::string& target) -> auto{ return std::find_if(inverted_index_files.begin(), inverted_index_files.end(), [&](const std::string& filename) { diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index f81780b72c..38060c8cb3 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -58,11 +58,18 @@ JsonKeyStatsInvertedIndex::AddJSONEncodeValue( void JsonKeyStatsInvertedIndex::AddInvertedRecord( std::map>& mp) { + std::vector json_offsets_lens; + std::vector keys; + std::vector json_offsets; for (auto& iter : mp) { - for (auto value : iter.second) { - wrapper_->add_array_data(&iter.first, 1, value); - } + keys.push_back(iter.first.c_str()); + json_offsets.push_back(iter.second.data()); + json_offsets_lens.push_back(iter.second.size()); } + wrapper_->add_json_key_stats_data_by_batch(keys.data(), + json_offsets.data(), + json_offsets_lens.data(), + keys.size()); } void @@ -307,6 +314,20 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( IndexStatsPtr JsonKeyStatsInvertedIndex::Upload(const Config& config) { finish(); + + index_build_timestamps_.index_build_done_ = + std::chrono::system_clock::now(); + LOG_INFO( + "build json key index done for field id:{}, json parse duration: {}s, " + "tantivy document add schedule duration : {}s, " + "tantivy total duration : {}s, " + "total duration : {}s", + field_id_, + index_build_timestamps_.getJsonParsingDuration(), + index_build_timestamps_.getTantivyAddSchedulingDuration(), + index_build_timestamps_.getTantivyTotalDuration(), + index_build_timestamps_.getIndexBuildTotalDuration()); + boost::filesystem::path p(path_); boost::filesystem::directory_iterator end_iter; @@ -386,6 +407,8 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData( const std::vector& field_datas, bool nullable) { int64_t offset = 0; std::map> mp; + index_build_timestamps_.index_build_begin_ = + std::chrono::system_clock::now(); if (nullable) { for (const auto& data : field_datas) { auto n = data->get_num_rows(); @@ -412,8 +435,13 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData( } } } + index_build_timestamps_.tantivy_build_begin_ = + std::chrono::system_clock::now(); + + // Schedule all document add operations to tantivy. AddInvertedRecord(mp); - LOG_INFO("build json key index done for field id:{}", field_id_); + index_build_timestamps_.tantivy_add_schedule_end_ = + std::chrono::system_clock::now(); } void diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.h b/internal/core/src/index/JsonKeyStatsInvertedIndex.h index c8b1428132..fe358b34a1 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.h +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.h @@ -294,5 +294,44 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy { std::atomic last_commit_time_; int64_t commit_interval_in_ms_; std::atomic is_data_uncommitted_ = false; + + struct IndexBuildTimestamps { + std::chrono::time_point index_build_begin_; + std::chrono::time_point tantivy_build_begin_; + // The time that we have finished push add operations to tantivy, which will be + // executed asynchronously + std::chrono::time_point + tantivy_add_schedule_end_; + std::chrono::time_point index_build_done_; + + auto + getJsonParsingDuration() const { + return std::chrono::duration(tantivy_build_begin_ - + index_build_begin_) + .count(); + } + + auto + getTantivyAddSchedulingDuration() const { + return std::chrono::duration(tantivy_add_schedule_end_ - + tantivy_build_begin_) + .count(); + } + + auto + getTantivyTotalDuration() const { + return std::chrono::duration(index_build_done_ - + tantivy_build_begin_) + .count(); + } + + auto + getIndexBuildTotalDuration() const { + return std::chrono::duration(index_build_done_ - + index_build_begin_) + .count(); + } + }; + IndexBuildTimestamps index_build_timestamps_; }; } // namespace milvus::index diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock index d45e87f255..ddb8ae3608 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock +++ b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock @@ -768,6 +768,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "futures" version = "0.3.31" @@ -1819,7 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -1892,6 +1898,29 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.5" @@ -1900,7 +1929,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -1910,9 +1939,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.4" @@ -1929,7 +1973,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" dependencies = [ "num-traits", - "rand", + "rand 0.8.5", ] [[package]] @@ -1952,6 +1996,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.5.8" @@ -2535,6 +2588,7 @@ dependencies = [ "libc", "lindera", "log", + "rand 0.3.23", "regex", "scopeguard", "serde_json", diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.toml b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.toml index 6d641de4f3..745fe65102 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.toml +++ b/internal/core/thirdparty/tantivy/tantivy-binding/Cargo.toml @@ -30,6 +30,7 @@ regex = "1.11.1" either = "1.13.0" [dev-dependencies] +rand = "0.3" tempfile = "3.0" [build-dependencies] diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 885b5906ce..5b6fb4f378 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -269,6 +269,12 @@ RustResult tantivy_index_add_strings_by_single_segment_writer(void *ptr, const char *const *array, uintptr_t len); +RustResult tantivy_index_add_json_key_stats_data_by_batch(void *ptr, + const char *const *keys, + const int64_t *const *json_offsets, + const uintptr_t *json_offsets_len, + uintptr_t len); + RustResult tantivy_index_add_array_int8s(void *ptr, const int8_t *array, uintptr_t len, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs index 4acca228de..7eb1e91c54 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs @@ -125,6 +125,24 @@ impl IndexWriterWrapper { } } + pub fn add_json_key_stats( + &mut self, + keys: &[*const i8], + json_offsets: &[*const i64], + json_offsets_len: &[usize], + ) -> Result<()> { + assert!(keys.len() == json_offsets.len()); + assert!(keys.len() == json_offsets_len.len()); + match self { + IndexWriterWrapper::V5(writer) => { + writer.add_json_key_stats(keys, json_offsets, json_offsets_len) + } + IndexWriterWrapper::V7(writer) => { + writer.add_json_key_stats(keys, json_offsets, json_offsets_len) + } + } + } + #[allow(dead_code)] pub fn manual_merge(&mut self) -> Result<()> { match self { @@ -152,9 +170,10 @@ impl IndexWriterWrapper { #[cfg(test)] mod tests { - use std::ops::Bound; + use rand::Rng; + use std::{ffi::CString, ops::Bound}; use tantivy_5::{query, Index, ReloadPolicy}; - use tempfile::TempDir; + use tempfile::{tempdir, TempDir}; use crate::{data_type::TantivyDataType, TantivyIndexVersion}; @@ -174,6 +193,7 @@ mod tests { 1, 50_000_000, TantivyIndexVersion::V5, + false, ) .unwrap(); @@ -286,4 +306,50 @@ mod tests { .unwrap(); assert_eq!(res.len(), 10); } + + #[test] + pub fn test_add_json_key_stats() { + use crate::data_type::TantivyDataType; + use crate::index_writer::IndexWriterWrapper; + + let temp_dir = tempdir().unwrap(); + let mut index_writer = IndexWriterWrapper::new( + "test", + TantivyDataType::Keyword, + temp_dir.path().to_str().unwrap().to_string(), + 1, + 15 * 1024 * 1024, + TantivyIndexVersion::V7, + false, + ) + .unwrap(); + + let keys = (0..100).map(|i| format!("key{:05}", i)).collect::>(); + let mut total_count = 0; + let mut rng = rand::thread_rng(); + let json_offsets = (0..100) + .map(|_| { + let count = rng.gen_range(0, 1000); + total_count += count; + (0..count) + .map(|_| rng.gen_range(0, i64::MAX)) + .collect::>() + }) + .collect::>(); + let json_offsets_len = json_offsets + .iter() + .map(|offsets| offsets.len()) + .collect::>(); + let json_offsets = json_offsets.iter().map(|x| x.as_ptr()).collect::>(); + let c_keys: Vec = keys.into_iter().map(|k| CString::new(k).unwrap()).collect(); + let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect(); + + index_writer + .add_json_key_stats(&key_ptrs, &json_offsets, &json_offsets_len) + .unwrap(); + + index_writer.commit().unwrap(); + let count = index_writer.create_reader().unwrap().count().unwrap(); + assert_eq!(count, total_count); + } } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs index 144dcd61ed..969b73333c 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs @@ -317,6 +317,25 @@ pub extern "C" fn tantivy_index_add_strings_by_single_segment_writer( .into() } +#[no_mangle] +pub extern "C" fn tantivy_index_add_json_key_stats_data_by_batch( + ptr: *mut c_void, + keys: *const *const c_char, + json_offsets: *const *const i64, + json_offsets_len: *const usize, + len: usize, +) -> RustResult { + let real = ptr as *mut IndexWriterWrapper; + let json_offsets_len = unsafe { slice::from_raw_parts(json_offsets_len, len) }; + let json_offsets = unsafe { slice::from_raw_parts(json_offsets, len) }; + let keys = unsafe { slice::from_raw_parts(keys, len) }; + unsafe { + (*real) + .add_json_key_stats(keys, json_offsets, json_offsets_len) + .into() + } +} + // --------------------------------------------- array ------------------------------------------ #[no_mangle] diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs index 66534a40cb..d4a524aee2 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v5/index_writer.rs @@ -302,6 +302,45 @@ impl IndexWriterWrapperImpl { Ok(()) } + pub fn add_json_key_stats( + &mut self, + keys: &[*const i8], + json_offsets: &[*const i64], + json_offsets_len: &[usize], + ) -> Result<()> { + let writer = self.index_writer.as_ref().left().unwrap(); + let id_field = self.id_field.unwrap(); + let mut batch = Vec::with_capacity(BATCH_SIZE); + for i in 0..keys.len() { + let key = unsafe { CStr::from_ptr(keys[i]) } + .to_str() + .map_err(|e| TantivyBindingError::InternalError(e.to_string()))?; + + let offsets = + unsafe { std::slice::from_raw_parts(json_offsets[i], json_offsets_len[i]) }; + + for offset in offsets { + batch.push(UserOperation::Add(doc!( + id_field => *offset, + self.field => key, + ))); + + if batch.len() >= BATCH_SIZE { + writer.run(std::mem::replace( + &mut batch, + Vec::with_capacity(BATCH_SIZE), + ))?; + } + } + } + + if !batch.is_empty() { + writer.run(batch)?; + } + + Ok(()) + } + pub fn manual_merge(&mut self) -> Result<()> { let index_writer = self.index_writer.as_mut().left().unwrap(); let metas = index_writer.index().searchable_segment_metas()?; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs index 53672747fa..6ffd0b07d6 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_v7/index_writer.rs @@ -220,6 +220,43 @@ impl IndexWriterWrapperImpl { Ok(()) } + pub fn add_json_key_stats( + &mut self, + keys: &[*const i8], + json_offsets: &[*const i64], + json_offsets_len: &[usize], + ) -> Result<()> { + let mut batch = Vec::with_capacity(BATCH_SIZE); + for i in 0..keys.len() { + let key = unsafe { CStr::from_ptr(keys[i]) } + .to_str() + .map_err(|e| TantivyBindingError::InternalError(e.to_string()))?; + + let offsets = + unsafe { std::slice::from_raw_parts(json_offsets[i], json_offsets_len[i]) }; + + for offset in offsets { + batch.push(UserOperation::Add(doc!( + self.id_field => *offset, + self.field => key, + ))); + + if batch.len() >= BATCH_SIZE { + self.index_writer.run(std::mem::replace( + &mut batch, + Vec::with_capacity(BATCH_SIZE), + ))?; + } + } + } + + if !batch.is_empty() { + self.index_writer.run(batch)?; + } + + Ok(()) + } + pub fn manual_merge(&mut self) -> Result<()> { let metas = self.index_writer.index().searchable_segment_metas()?; let policy = self.index_writer.get_merge_policy(); @@ -265,6 +302,7 @@ mod tests { 1, 15 * 1024 * 1024, TantivyIndexVersion::V7, + false, ) .unwrap(); diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index 989812bb66..d6b0fd0875 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -271,6 +271,20 @@ struct TantivyIndexWrapper { typeid(T).name()); } + void + add_json_key_stats_data_by_batch(const char* const* keys, + const int64_t* const* json_offsets, + const uintptr_t* json_offsets_lens, + uintptr_t len_of_lens) { + assert(!finished_); + auto res = + RustResultWrapper(tantivy_index_add_json_key_stats_data_by_batch( + writer_, keys, json_offsets, json_offsets_lens, len_of_lens)); + AssertInfo(res.result_->success, + "failed to add json key stats: {}", + res.result_->error); + } + template void add_array_data(const T* array, uintptr_t len, int64_t offset) {