enhance: add documents in batch for json key stats (#41228)

issue: https://github.com/milvus-io/milvus/issues/40897

After this, the document add operations scheduling duration is decreased
roughly from 6s to 0.9s for the case in the issue.

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
Spade A 2025-04-11 14:08:26 +08:00 committed by GitHub
parent 4d37b2870d
commit 9ce3e3cb44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 316 additions and 12 deletions

View File

@ -540,7 +540,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context,
}; };
} else { } else {
auto size_per_chunk = segment_->size_per_chunk(); auto size_per_chunk = segment_->size_per_chunk();
retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{
auto chunk_idx = offset / size_per_chunk; auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk; auto chunk_offset = offset % size_per_chunk;
const auto& chunk = const auto& chunk =

View File

@ -219,7 +219,7 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
std::vector<std::string> null_offset_files; std::vector<std::string> null_offset_files;
std::shared_ptr<FieldDataBase> null_offset_data; std::shared_ptr<FieldDataBase> null_offset_data;
auto find_file = [&](const std::string& target) -> auto { auto find_file = [&](const std::string& target) -> auto{
return std::find_if(inverted_index_files.begin(), return std::find_if(inverted_index_files.begin(),
inverted_index_files.end(), inverted_index_files.end(),
[&](const std::string& filename) { [&](const std::string& filename) {

View File

@ -58,11 +58,18 @@ JsonKeyStatsInvertedIndex::AddJSONEncodeValue(
void void
JsonKeyStatsInvertedIndex::AddInvertedRecord( JsonKeyStatsInvertedIndex::AddInvertedRecord(
std::map<std::string, std::vector<int64_t>>& mp) { std::map<std::string, std::vector<int64_t>>& mp) {
std::vector<uintptr_t> json_offsets_lens;
std::vector<const char*> keys;
std::vector<const int64_t*> json_offsets;
for (auto& iter : mp) { for (auto& iter : mp) {
for (auto value : iter.second) { keys.push_back(iter.first.c_str());
wrapper_->add_array_data<std::string>(&iter.first, 1, value); json_offsets.push_back(iter.second.data());
} json_offsets_lens.push_back(iter.second.size());
} }
wrapper_->add_json_key_stats_data_by_batch(keys.data(),
json_offsets.data(),
json_offsets_lens.data(),
keys.size());
} }
void void
@ -307,6 +314,20 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
IndexStatsPtr IndexStatsPtr
JsonKeyStatsInvertedIndex::Upload(const Config& config) { JsonKeyStatsInvertedIndex::Upload(const Config& config) {
finish(); finish();
index_build_timestamps_.index_build_done_ =
std::chrono::system_clock::now();
LOG_INFO(
"build json key index done for field id:{}, json parse duration: {}s, "
"tantivy document add schedule duration : {}s, "
"tantivy total duration : {}s, "
"total duration : {}s",
field_id_,
index_build_timestamps_.getJsonParsingDuration(),
index_build_timestamps_.getTantivyAddSchedulingDuration(),
index_build_timestamps_.getTantivyTotalDuration(),
index_build_timestamps_.getIndexBuildTotalDuration());
boost::filesystem::path p(path_); boost::filesystem::path p(path_);
boost::filesystem::directory_iterator end_iter; boost::filesystem::directory_iterator end_iter;
@ -386,6 +407,8 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData(
const std::vector<FieldDataPtr>& field_datas, bool nullable) { const std::vector<FieldDataPtr>& field_datas, bool nullable) {
int64_t offset = 0; int64_t offset = 0;
std::map<std::string, std::vector<int64_t>> mp; std::map<std::string, std::vector<int64_t>> mp;
index_build_timestamps_.index_build_begin_ =
std::chrono::system_clock::now();
if (nullable) { if (nullable) {
for (const auto& data : field_datas) { for (const auto& data : field_datas) {
auto n = data->get_num_rows(); auto n = data->get_num_rows();
@ -412,8 +435,13 @@ JsonKeyStatsInvertedIndex::BuildWithFieldData(
} }
} }
} }
index_build_timestamps_.tantivy_build_begin_ =
std::chrono::system_clock::now();
// Schedule all document add operations to tantivy.
AddInvertedRecord(mp); AddInvertedRecord(mp);
LOG_INFO("build json key index done for field id:{}", field_id_); index_build_timestamps_.tantivy_add_schedule_end_ =
std::chrono::system_clock::now();
} }
void void

View File

@ -294,5 +294,44 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy<std::string> {
std::atomic<stdclock::time_point> last_commit_time_; std::atomic<stdclock::time_point> last_commit_time_;
int64_t commit_interval_in_ms_; int64_t commit_interval_in_ms_;
std::atomic<bool> is_data_uncommitted_ = false; std::atomic<bool> is_data_uncommitted_ = false;
struct IndexBuildTimestamps {
std::chrono::time_point<std::chrono::system_clock> index_build_begin_;
std::chrono::time_point<std::chrono::system_clock> tantivy_build_begin_;
// The time that we have finished push add operations to tantivy, which will be
// executed asynchronously
std::chrono::time_point<std::chrono::system_clock>
tantivy_add_schedule_end_;
std::chrono::time_point<std::chrono::system_clock> index_build_done_;
auto
getJsonParsingDuration() const {
return std::chrono::duration<double>(tantivy_build_begin_ -
index_build_begin_)
.count();
}
auto
getTantivyAddSchedulingDuration() const {
return std::chrono::duration<double>(tantivy_add_schedule_end_ -
tantivy_build_begin_)
.count();
}
auto
getTantivyTotalDuration() const {
return std::chrono::duration<double>(index_build_done_ -
tantivy_build_begin_)
.count();
}
auto
getIndexBuildTotalDuration() const {
return std::chrono::duration<double>(index_build_done_ -
index_build_begin_)
.count();
}
};
IndexBuildTimestamps index_build_timestamps_;
}; };
} // namespace milvus::index } // namespace milvus::index

View File

@ -768,6 +768,12 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.31"
@ -1819,7 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [ dependencies = [
"phf_shared", "phf_shared",
"rand", "rand 0.8.5",
] ]
[[package]] [[package]]
@ -1892,6 +1898,29 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rand"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c"
dependencies = [
"libc",
"rand 0.4.6",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.5" version = "0.8.5"
@ -1900,7 +1929,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha", "rand_chacha",
"rand_core", "rand_core 0.6.4",
] ]
[[package]] [[package]]
@ -1910,9 +1939,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core", "rand_core 0.6.4",
] ]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]] [[package]]
name = "rand_core" name = "rand_core"
version = "0.6.4" version = "0.6.4"
@ -1929,7 +1973,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
dependencies = [ dependencies = [
"num-traits", "num-traits",
"rand", "rand 0.8.5",
] ]
[[package]] [[package]]
@ -1952,6 +1996,15 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.8" version = "0.5.8"
@ -2535,6 +2588,7 @@ dependencies = [
"libc", "libc",
"lindera", "lindera",
"log", "log",
"rand 0.3.23",
"regex", "regex",
"scopeguard", "scopeguard",
"serde_json", "serde_json",

View File

@ -30,6 +30,7 @@ regex = "1.11.1"
either = "1.13.0" either = "1.13.0"
[dev-dependencies] [dev-dependencies]
rand = "0.3"
tempfile = "3.0" tempfile = "3.0"
[build-dependencies] [build-dependencies]

View File

@ -269,6 +269,12 @@ RustResult tantivy_index_add_strings_by_single_segment_writer(void *ptr,
const char *const *array, const char *const *array,
uintptr_t len); uintptr_t len);
RustResult tantivy_index_add_json_key_stats_data_by_batch(void *ptr,
const char *const *keys,
const int64_t *const *json_offsets,
const uintptr_t *json_offsets_len,
uintptr_t len);
RustResult tantivy_index_add_array_int8s(void *ptr, RustResult tantivy_index_add_array_int8s(void *ptr,
const int8_t *array, const int8_t *array,
uintptr_t len, uintptr_t len,

View File

@ -125,6 +125,24 @@ impl IndexWriterWrapper {
} }
} }
pub fn add_json_key_stats(
&mut self,
keys: &[*const i8],
json_offsets: &[*const i64],
json_offsets_len: &[usize],
) -> Result<()> {
assert!(keys.len() == json_offsets.len());
assert!(keys.len() == json_offsets_len.len());
match self {
IndexWriterWrapper::V5(writer) => {
writer.add_json_key_stats(keys, json_offsets, json_offsets_len)
}
IndexWriterWrapper::V7(writer) => {
writer.add_json_key_stats(keys, json_offsets, json_offsets_len)
}
}
}
#[allow(dead_code)] #[allow(dead_code)]
pub fn manual_merge(&mut self) -> Result<()> { pub fn manual_merge(&mut self) -> Result<()> {
match self { match self {
@ -152,9 +170,10 @@ impl IndexWriterWrapper {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::ops::Bound; use rand::Rng;
use std::{ffi::CString, ops::Bound};
use tantivy_5::{query, Index, ReloadPolicy}; use tantivy_5::{query, Index, ReloadPolicy};
use tempfile::TempDir; use tempfile::{tempdir, TempDir};
use crate::{data_type::TantivyDataType, TantivyIndexVersion}; use crate::{data_type::TantivyDataType, TantivyIndexVersion};
@ -174,6 +193,7 @@ mod tests {
1, 1,
50_000_000, 50_000_000,
TantivyIndexVersion::V5, TantivyIndexVersion::V5,
false,
) )
.unwrap(); .unwrap();
@ -286,4 +306,50 @@ mod tests {
.unwrap(); .unwrap();
assert_eq!(res.len(), 10); assert_eq!(res.len(), 10);
} }
#[test]
pub fn test_add_json_key_stats() {
use crate::data_type::TantivyDataType;
use crate::index_writer::IndexWriterWrapper;
let temp_dir = tempdir().unwrap();
let mut index_writer = IndexWriterWrapper::new(
"test",
TantivyDataType::Keyword,
temp_dir.path().to_str().unwrap().to_string(),
1,
15 * 1024 * 1024,
TantivyIndexVersion::V7,
false,
)
.unwrap();
let keys = (0..100).map(|i| format!("key{:05}", i)).collect::<Vec<_>>();
let mut total_count = 0;
let mut rng = rand::thread_rng();
let json_offsets = (0..100)
.map(|_| {
let count = rng.gen_range(0, 1000);
total_count += count;
(0..count)
.map(|_| rng.gen_range(0, i64::MAX))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let json_offsets_len = json_offsets
.iter()
.map(|offsets| offsets.len())
.collect::<Vec<_>>();
let json_offsets = json_offsets.iter().map(|x| x.as_ptr()).collect::<Vec<_>>();
let c_keys: Vec<CString> = keys.into_iter().map(|k| CString::new(k).unwrap()).collect();
let key_ptrs: Vec<*const libc::c_char> = c_keys.iter().map(|cs| cs.as_ptr()).collect();
index_writer
.add_json_key_stats(&key_ptrs, &json_offsets, &json_offsets_len)
.unwrap();
index_writer.commit().unwrap();
let count = index_writer.create_reader().unwrap().count().unwrap();
assert_eq!(count, total_count);
}
} }

View File

@ -317,6 +317,25 @@ pub extern "C" fn tantivy_index_add_strings_by_single_segment_writer(
.into() .into()
} }
#[no_mangle]
pub extern "C" fn tantivy_index_add_json_key_stats_data_by_batch(
ptr: *mut c_void,
keys: *const *const c_char,
json_offsets: *const *const i64,
json_offsets_len: *const usize,
len: usize,
) -> RustResult {
let real = ptr as *mut IndexWriterWrapper;
let json_offsets_len = unsafe { slice::from_raw_parts(json_offsets_len, len) };
let json_offsets = unsafe { slice::from_raw_parts(json_offsets, len) };
let keys = unsafe { slice::from_raw_parts(keys, len) };
unsafe {
(*real)
.add_json_key_stats(keys, json_offsets, json_offsets_len)
.into()
}
}
// --------------------------------------------- array ------------------------------------------ // --------------------------------------------- array ------------------------------------------
#[no_mangle] #[no_mangle]

View File

@ -302,6 +302,45 @@ impl IndexWriterWrapperImpl {
Ok(()) Ok(())
} }
pub fn add_json_key_stats(
&mut self,
keys: &[*const i8],
json_offsets: &[*const i64],
json_offsets_len: &[usize],
) -> Result<()> {
let writer = self.index_writer.as_ref().left().unwrap();
let id_field = self.id_field.unwrap();
let mut batch = Vec::with_capacity(BATCH_SIZE);
for i in 0..keys.len() {
let key = unsafe { CStr::from_ptr(keys[i]) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let offsets =
unsafe { std::slice::from_raw_parts(json_offsets[i], json_offsets_len[i]) };
for offset in offsets {
batch.push(UserOperation::Add(doc!(
id_field => *offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
}
if !batch.is_empty() {
writer.run(batch)?;
}
Ok(())
}
pub fn manual_merge(&mut self) -> Result<()> { pub fn manual_merge(&mut self) -> Result<()> {
let index_writer = self.index_writer.as_mut().left().unwrap(); let index_writer = self.index_writer.as_mut().left().unwrap();
let metas = index_writer.index().searchable_segment_metas()?; let metas = index_writer.index().searchable_segment_metas()?;

View File

@ -220,6 +220,43 @@ impl IndexWriterWrapperImpl {
Ok(()) Ok(())
} }
pub fn add_json_key_stats(
&mut self,
keys: &[*const i8],
json_offsets: &[*const i64],
json_offsets_len: &[usize],
) -> Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
for i in 0..keys.len() {
let key = unsafe { CStr::from_ptr(keys[i]) }
.to_str()
.map_err(|e| TantivyBindingError::InternalError(e.to_string()))?;
let offsets =
unsafe { std::slice::from_raw_parts(json_offsets[i], json_offsets_len[i]) };
for offset in offsets {
batch.push(UserOperation::Add(doc!(
self.id_field => *offset,
self.field => key,
)));
if batch.len() >= BATCH_SIZE {
self.index_writer.run(std::mem::replace(
&mut batch,
Vec::with_capacity(BATCH_SIZE),
))?;
}
}
}
if !batch.is_empty() {
self.index_writer.run(batch)?;
}
Ok(())
}
pub fn manual_merge(&mut self) -> Result<()> { pub fn manual_merge(&mut self) -> Result<()> {
let metas = self.index_writer.index().searchable_segment_metas()?; let metas = self.index_writer.index().searchable_segment_metas()?;
let policy = self.index_writer.get_merge_policy(); let policy = self.index_writer.get_merge_policy();
@ -265,6 +302,7 @@ mod tests {
1, 1,
15 * 1024 * 1024, 15 * 1024 * 1024,
TantivyIndexVersion::V7, TantivyIndexVersion::V7,
false,
) )
.unwrap(); .unwrap();

View File

@ -271,6 +271,20 @@ struct TantivyIndexWrapper {
typeid(T).name()); typeid(T).name());
} }
void
add_json_key_stats_data_by_batch(const char* const* keys,
const int64_t* const* json_offsets,
const uintptr_t* json_offsets_lens,
uintptr_t len_of_lens) {
assert(!finished_);
auto res =
RustResultWrapper(tantivy_index_add_json_key_stats_data_by_batch(
writer_, keys, json_offsets, json_offsets_lens, len_of_lens));
AssertInfo(res.result_->success,
"failed to add json key stats: {}",
res.result_->error);
}
template <typename T> template <typename T>
void void
add_array_data(const T* array, uintptr_t len, int64_t offset) { add_array_data(const T* array, uintptr_t len, int64_t offset) {