From 26ec841feb33fd57c11e294a4485f507360b41eb Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Tue, 1 Jul 2025 10:08:44 +0800 Subject: [PATCH] feat: optimize `Like` query with n-gram (#41803) Ref #42053 This is the first PR for optimizing `LIKE` with ngram inverted index. Now, only VARCHAR data type is supported and only InnerMatch LIKE (%xxx%) query is supported. How to use it: ``` milvus_client = MilvusClient("http://localhost:19530") schema = milvus_client.create_schema() ... schema.add_field("content_ngram", DataType.VARCHAR, max_length=10000) ... index_params = milvus_client.prepare_index_params() index_params.add_index(field_name="content_ngram", index_type="NGRAM", index_name="ngram_index", min_gram=2, max_gram=3) milvus_client.create_collection(COLLECTION_NAME, ...) ``` min_gram and max_gram controls how we tokenize the documents. For example, for min_gram=2 and max_gram=4, we will tokenize each document with 2-gram, 3-gram and 4-gram. --------- Signed-off-by: SpadeA Signed-off-by: SpadeA-Tang --- internal/core/src/common/Consts.h | 1 + .../src/exec/expression/BinaryRangeExpr.cpp | 87 +- internal/core/src/exec/expression/Expr.h | 76 +- .../src/exec/expression/JsonContainsExpr.cpp | 790 +++++++++--------- .../core/src/exec/expression/UnaryExpr.cpp | 456 +++++----- internal/core/src/exec/expression/UnaryExpr.h | 4 + internal/core/src/exec/expression/Utils.h | 117 +++ internal/core/src/index/IndexFactory.cpp | 36 +- internal/core/src/index/IndexFactory.h | 7 + internal/core/src/index/IndexInfo.h | 7 + .../src/index/JsonKeyStatsInvertedIndex.cpp | 3 +- internal/core/src/index/Meta.h | 3 + .../core/src/index/NgramInvertedIndex.cpp | 153 ++++ internal/core/src/index/NgramInvertedIndex.h | 47 ++ .../src/indexbuilder/ScalarIndexCreator.cpp | 25 +- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 32 +- .../src/segcore/ChunkedSegmentSealedImpl.h | 12 + .../core/src/segcore/SegmentInterface.cpp | 7 + internal/core/src/segcore/SegmentInterface.h | 7 + internal/core/src/segcore/SegmentSealed.h | 7 + internal/core/src/segcore/load_index_c.cpp | 22 + .../core/src/storage/DiskFileManagerImpl.cpp | 42 + .../core/src/storage/DiskFileManagerImpl.h | 15 + internal/core/src/storage/Util.cpp | 20 + internal/core/src/storage/Util.h | 8 + .../tantivy-binding/include/tantivy-binding.h | 13 + .../tantivy-binding/src/index_ngram_writer.rs | 157 ++++ .../src/index_ngram_writer_c.rs | 37 + .../tantivy-binding/src/index_reader.rs | 39 +- .../tantivy-binding/src/index_reader_c.rs | 19 + .../tantivy/tantivy-binding/src/lib.rs | 2 + .../core/thirdparty/tantivy/tantivy-wrapper.h | 39 + internal/core/unittest/CMakeLists.txt | 1 + internal/core/unittest/test_ngram_query.cpp | 377 +++++++++ internal/datacoord/stats_inspector.go | 11 +- internal/proxy/task_index_test.go | 88 ++ .../querynodev2/segments/segment_loader.go | 6 +- .../util/indexparamcheck/conf_adapter_mgr.go | 1 + internal/util/indexparamcheck/index_type.go | 1 + .../indexparamcheck/ngram_index_checker.go | 60 ++ 40 files changed, 2179 insertions(+), 656 deletions(-) create mode 100644 internal/core/src/index/NgramInvertedIndex.cpp create mode 100644 internal/core/src/index/NgramInvertedIndex.h create mode 100644 internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer.rs create mode 100644 internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer_c.rs create mode 100644 internal/core/unittest/test_ngram_query.cpp create mode 100644 internal/util/indexparamcheck/ngram_index_checker.go diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 30824cc2ba..07b0075d52 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -50,6 +50,7 @@ const char TEXT_LOG_ROOT_PATH[] = "text_log"; const char ITERATIVE_FILTER[] = "iterative_filter"; const char HINTS[] = "hints"; const char JSON_KEY_INDEX_LOG_ROOT_PATH[] = "json_key_index_log"; +const char NGRAM_LOG_ROOT_PATH[] = "ngram_log"; const char DEFAULT_PLANNODE_ID[] = "0"; const char DEAFULT_QUERY_ID[] = "0"; diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index 3fa7ad707b..a4a24ee55b 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -305,17 +305,19 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); size_t processed_cursor = 0; - auto execute_sub_batch = - [ lower_inclusive, upper_inclusive, &processed_cursor, & - bitmap_input ]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - HighPrecisionType val1, - HighPrecisionType val2) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive, + &processed_cursor, + &bitmap_input]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + HighPrecisionType val1, + HighPrecisionType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; func(val1, @@ -447,22 +449,20 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(EvalCtx& context) { auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); size_t processed_cursor = 0; - auto execute_sub_batch = - [ - lower_inclusive, - upper_inclusive, - pointer, - &bitmap_input, - &processed_cursor - ]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive, + pointer, + &bitmap_input, + &processed_cursor]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForJson func; @@ -550,9 +550,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() { using GetType = std::conditional_t, std::string_view, ValueType>; - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); #define BinaryRangeJSONIndexCompare(cmp) \ do { \ @@ -852,18 +853,20 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(EvalCtx& context) { } size_t processed_cursor = 0; - auto execute_sub_batch = - [ lower_inclusive, upper_inclusive, &processed_cursor, & - bitmap_input ]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2, - int index) { + auto execute_sub_batch = [lower_inclusive, + upper_inclusive, + &processed_cursor, + &bitmap_input]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2, + int index) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForArray func; diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index cacd3c9621..36c76f07ff 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -676,23 +676,34 @@ class SegmentExpr : public Expr { return processed_size; } + + // If process_all_chunks is true, all chunks will be processed and no inner state will be changed. template int64_t - ProcessDataChunksForMultipleChunk( + ProcessMultipleChunksCommon( FUNC func, std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, + bool process_all_chunks, ValTypes... values) { int64_t processed_size = 0; - for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { + size_t start_chunk = process_all_chunks ? 0 : current_data_chunk_; + + for (size_t i = start_chunk; i < num_data_chunk_; i++) { auto data_pos = - (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; + process_all_chunks + ? 0 + : (i == current_data_chunk_ ? current_data_chunk_pos_ : 0); // if segment is chunked, type won't be growing int64_t size = segment_->chunk_size(field_id_, i) - data_pos; - size = std::min(size, batch_size_ - processed_size); + // process a whole chunk if process_all_chunks is true + if (!process_all_chunks) { + size = std::min(size, batch_size_ - processed_size); + } + if (size == 0) continue; //do not go empty-loop at the bound of the chunk @@ -761,7 +772,8 @@ class SegmentExpr : public Expr { } processed_size += size; - if (processed_size >= batch_size_) { + + if (!process_all_chunks && processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; break; @@ -771,6 +783,30 @@ class SegmentExpr : public Expr { return processed_size; } + template + int64_t + ProcessDataChunksForMultipleChunk( + FUNC func, + std::function skip_func, + TargetBitmapView res, + TargetBitmapView valid_res, + ValTypes... values) { + return ProcessMultipleChunksCommon( + func, skip_func, res, valid_res, false, values...); + } + + template + int64_t + ProcessAllChunksForMultipleChunk( + FUNC func, + std::function skip_func, + TargetBitmapView res, + TargetBitmapView valid_res, + ValTypes... values) { + return ProcessMultipleChunksCommon( + func, skip_func, res, valid_res, true, values...); + } + template int64_t ProcessDataChunks( @@ -788,6 +824,22 @@ class SegmentExpr : public Expr { } } + template + int64_t + ProcessAllDataChunk( + FUNC func, + std::function skip_func, + TargetBitmapView res, + TargetBitmapView valid_res, + ValTypes... values) { + if (segment_->is_chunked()) { + return ProcessAllChunksForMultipleChunk( + func, skip_func, res, valid_res, values...); + } else { + PanicInfo(ErrorCode::Unsupported, "unreachable"); + } + } + int ProcessIndexOneChunk(TargetBitmap& result, TargetBitmap& valid_result, @@ -1169,7 +1221,7 @@ class SegmentExpr : public Expr { // return batch size, not sure if we should use the data position. auto real_batch_size = - current_data_chunk_pos_ + batch_size_ > active_count_ + (current_data_chunk_pos_ + batch_size_ > active_count_) ? active_count_ - current_data_chunk_pos_ : batch_size_; result.append( @@ -1266,6 +1318,15 @@ class SegmentExpr : public Expr { return false; } + bool + CanUseNgramIndex(FieldId field_id) const { + if (segment_->type() != SegmentType::Sealed) { + return false; + } + auto cast_ptr = dynamic_cast(segment_); + return (cast_ptr != nullptr && cast_ptr->HasNgramIndex(field_id)); + } + protected: const segcore::SegmentInternalInterface* segment_; const FieldId field_id_; @@ -1305,6 +1366,9 @@ class SegmentExpr : public Expr { // Cache for text match. std::shared_ptr cached_match_res_{nullptr}; int32_t consistency_level_{0}; + + // Cache for ngram match. + std::shared_ptr cached_ngram_match_res_{nullptr}; }; bool diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index aa8babade9..209c94a322 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -219,8 +219,8 @@ PhyJsonContainsFilterExpr::ExecArrayContains(EvalCtx& context) { int processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -228,32 +228,32 @@ PhyJsonContainsFilterExpr::ExecArrayContains(EvalCtx& context) { TargetBitmapView res, TargetBitmapView valid_res, const std::shared_ptr& elements) { - auto executor = [&](size_t i) { - const auto& array = data[i]; - for (int j = 0; j < array.length(); ++j) { - if (elements->In(array.template get_data(j))) { - return true; + auto executor = [&](size_t i) { + const auto& array = data[i]; + for (int j = 0; j < array.length(); ++j) { + if (elements->In(array.template get_data(j))) { + return true; + } } + return false; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } - return false; + processed_cursor += size; }; - bool has_bitmap_input = !bitmap_input.empty(); - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; int64_t processed_size; if (has_offset_input_) { @@ -311,8 +311,8 @@ PhyJsonContainsFilterExpr::ExecJsonContains(EvalCtx& context) { size_t processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -321,40 +321,40 @@ PhyJsonContainsFilterExpr::ExecJsonContains(EvalCtx& context) { TargetBitmapView valid_res, const std::string& pointer, const std::shared_ptr& elements) { - auto executor = [&](size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { + auto executor = [&](size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (elements->In(val.value()) > 0) { + return true; + } + } return false; - } - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; continue; } - if (elements->In(val.value()) > 0) { - return true; + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; } + res[i] = executor(offset); } - return false; + processed_cursor += size; }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; int64_t processed_size; if (has_offset_input_) { @@ -388,9 +388,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() { std::conditional_t, std::string_view, ExprValueType>; - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; std::unordered_set elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); if (!arg_inited_) { @@ -519,8 +520,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { size_t processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -529,49 +530,49 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; + auto executor = [&](size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (auto const& element : elements) { - if (CompareTwoJsonArray(json_array, element)) { - return true; + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; + } + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); + } + for (auto const& element : elements) { + if (CompareTwoJsonArray(json_array, element)) { + return true; + } } } + return false; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } - return false; + processed_cursor += size; }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; int64_t processed_size; if (has_offset_input_) { @@ -600,9 +601,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() { - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; std::vector elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); for (auto const& element : expr_->vals_) { @@ -733,8 +735,8 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) { int processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -742,34 +744,34 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) { TargetBitmapView res, TargetBitmapView valid_res, const std::set& elements) { - auto executor = [&](size_t i) { - std::set tmp_elements(elements); - // Note: array can only be iterated once - for (int j = 0; j < data[i].length(); ++j) { - tmp_elements.erase(data[i].template get_data(j)); - if (tmp_elements.size() == 0) { - return true; + auto executor = [&](size_t i) { + std::set tmp_elements(elements); + // Note: array can only be iterated once + for (int j = 0; j < data[i].length(); ++j) { + tmp_elements.erase(data[i].template get_data(j)); + if (tmp_elements.size() == 0) { + return true; + } } + return tmp_elements.size() == 0; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } - return tmp_elements.size() == 0; + processed_cursor += size; }; - bool has_bitmap_input = !bitmap_input.empty(); - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -825,8 +827,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) { int processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -835,43 +837,43 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) { TargetBitmapView valid_res, const std::string& pointer, const std::set& elements) { - auto executor = [&](const size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::set tmp_elements(elements); - // Note: array can only be iterated once - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { + auto executor = [&](const size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::set tmp_elements(elements); + // Note: array can only be iterated once + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } + } + return tmp_elements.size() == 0; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; continue; } - tmp_elements.erase(val.value()); - if (tmp_elements.size() == 0) { - return true; + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; } + res[i] = executor(offset); } - return tmp_elements.size() == 0; + processed_cursor += size; }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; int64_t processed_size; if (has_offset_input_) { @@ -905,9 +907,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() { std::conditional_t, std::string_view, ExprValueType>; - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; std::set elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); for (auto const& element : expr_->vals_) { @@ -1039,8 +1042,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) { int processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -1050,102 +1053,104 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) { const std::string& pointer, const std::vector& elements, const std::unordered_set elements_index) { - auto executor = [&](size_t i) -> bool { - const auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set tmp_elements_index(elements_index); - for (auto&& it : array) { - int i = -1; - for (auto& element : elements) { - i++; - switch (element.val_case()) { - case proto::plan::GenericValue::kBoolVal: { - auto val = it.template get(); - if (val.error()) { - continue; + auto executor = [&](size_t i) -> bool { + const auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::unordered_set tmp_elements_index(elements_index); + for (auto&& it : array) { + int i = -1; + for (auto& element : elements) { + i++; + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.bool_val()) { + tmp_elements_index.erase(i); + } + break; } - if (val.value() == element.bool_val()) { - tmp_elements_index.erase(i); + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.int64_val()) { + tmp_elements_index.erase(i); + } + break; } - break; + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.float_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, + element.array_val())) { + tmp_elements_index.erase(i); + } + break; + } + default: + PanicInfo( + DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); } - case proto::plan::GenericValue::kInt64Val: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.int64_val()) { - tmp_elements_index.erase(i); - } - break; + if (tmp_elements_index.size() == 0) { + return true; } - case proto::plan::GenericValue::kFloatVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.float_val()) { - tmp_elements_index.erase(i); - } - break; - } - case proto::plan::GenericValue::kStringVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.string_val()) { - tmp_elements_index.erase(i); - } - break; - } - case proto::plan::GenericValue::kArrayVal: { - auto val = it.get_array(); - if (val.error()) { - continue; - } - if (CompareTwoJsonArray(val, element.array_val())) { - tmp_elements_index.erase(i); - } - break; - } - default: - PanicInfo(DataTypeInvalid, - fmt::format("unsupported data type {}", - element.val_case())); } if (tmp_elements_index.size() == 0) { return true; } } - if (tmp_elements_index.size() == 0) { - return true; + return tmp_elements_index.size() == 0; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; } - } - return tmp_elements_index.size() == 0; - }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; + res[i] = executor(offset); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { @@ -1176,9 +1181,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() { - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); auto elements = expr_->vals_; std::set elements_index; @@ -1371,8 +1377,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) { size_t processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -1381,54 +1387,54 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set exist_elements_index; - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; + auto executor = [&](const size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (int index = 0; index < elements.size(); ++index) { - if (CompareTwoJsonArray(json_array, elements[index])) { - exist_elements_index.insert(index); + std::unordered_set exist_elements_index; + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; + } + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); + } + for (int index = 0; index < elements.size(); ++index) { + if (CompareTwoJsonArray(json_array, elements[index])) { + exist_elements_index.insert(index); + } + } + if (exist_elements_index.size() == elements.size()) { + return true; } } - if (exist_elements_index.size() == elements.size()) { - return true; + return exist_elements_index.size() == elements.size(); + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; } - } - return exist_elements_index.size() == elements.size(); - }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = executor(offset); - } - processed_cursor += size; - }; + res[i] = executor(offset); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { @@ -1457,9 +1463,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() { - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); std::vector elements; for (auto const& element : expr_->vals_) { @@ -1596,8 +1603,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) { size_t processed_cursor = 0; auto execute_sub_batch = - [&processed_cursor, & - bitmap_input ]( + [&processed_cursor, + &bitmap_input]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -1606,94 +1613,96 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - // Note: array can only be iterated once - for (auto&& it : array) { - for (auto const& element : elements) { - switch (element.val_case()) { - case proto::plan::GenericValue::kBoolVal: { - auto val = it.template get(); - if (val.error()) { - continue; + auto executor = [&](const size_t i) { + auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + // Note: array can only be iterated once + for (auto&& it : array) { + for (auto const& element : elements) { + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.bool_val()) { + return true; + } + break; } - if (val.value() == element.bool_val()) { - return true; + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.int64_val()) { + return true; + } + break; } - break; + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.float_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, + element.array_val())) { + return true; + } + break; + } + default: + PanicInfo( + DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); } - case proto::plan::GenericValue::kInt64Val: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.int64_val()) { - return true; - } - break; - } - case proto::plan::GenericValue::kFloatVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.float_val()) { - return true; - } - break; - } - case proto::plan::GenericValue::kStringVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.string_val()) { - return true; - } - break; - } - case proto::plan::GenericValue::kArrayVal: { - auto val = it.get_array(); - if (val.error()) { - continue; - } - if (CompareTwoJsonArray(val, element.array_val())) { - return true; - } - break; - } - default: - PanicInfo(DataTypeInvalid, - fmt::format("unsupported data type {}", - element.val_case())); } } - } - return false; - }; - bool has_bitmap_input = !bitmap_input.empty(); - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } + return false; + }; + bool has_bitmap_input = !bitmap_input.empty(); + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } - res[i] = executor(offset); - } - processed_cursor += size; - }; + res[i] = executor(offset); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { @@ -1722,9 +1731,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() { - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); auto elements = expr_->vals_; if (elements.empty()) { diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 8e62a54220..9e711cdfc6 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -318,8 +318,9 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { } int processed_cursor = 0; auto execute_sub_batch = - [ op_type, &processed_cursor, & - bitmap_input ]( + [op_type, + &processed_cursor, + &bitmap_input]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -328,185 +329,186 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { TargetBitmapView valid_res, ValueType val, int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::PostfixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::InnerMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format( + "unsupported operator type for unary expr: {}", + op_type)); } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::PostfixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::InnerMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - default: - PanicInfo( - OpTypeInvalid, - fmt::format("unsupported operator type for unary expr: {}", - op_type)); - } - processed_cursor += size; - }; + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -706,16 +708,18 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { } while (false) int processed_cursor = 0; - auto execute_sub_batch = - [ op_type, pointer, &processed_cursor, & - bitmap_input ]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = [op_type, + pointer, + &processed_cursor, + &bitmap_input]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { @@ -1480,6 +1484,14 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(EvalCtx& context) { fmt::format("match query does not support iterative filter")); } return ExecTextMatch(); + } else if (expr_->op_type_ == proto::plan::OpType::InnerMatch && + !has_offset_input_ && CanUseNgramIndex(field_id_)) { + auto res = ExecNgramMatch(); + // If nullopt is returned, it means the query cannot be + // optimized by ngram index. Forward it to the normal path. + if (res.has_value()) { + return res.value(); + } } if (CanUseIndex() && !has_offset_input_) { @@ -1675,16 +1687,17 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { auto expr_type = expr_->op_type_; size_t processed_cursor = 0; - auto execute_sub_batch = - [ expr_type, &processed_cursor, & - bitmap_input ]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + auto execute_sub_batch = [expr_type, + &processed_cursor, + &bitmap_input]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; @@ -1920,5 +1933,48 @@ PhyUnaryRangeFilterExpr::ExecTextMatch() { return res; }; +std::optional +PhyUnaryRangeFilterExpr::ExecNgramMatch() { + if (!arg_inited_) { + value_arg_.SetValue(expr_->val_); + arg_inited_ = true; + } + + auto literal = value_arg_.GetValue(); + + TargetBitmap result; + TargetBitmap valid_result; + + if (cached_ngram_match_res_ == nullptr) { + auto pinned_index = segment_->GetNgramIndex(field_id_); + auto index = pinned_index.get(); + AssertInfo(index != nullptr, + "ngram index should not be null, field_id: {}", + field_id_.get()); + auto res_opt = index->InnerMatchQuery(literal, this); + if (!res_opt.has_value()) { + return std::nullopt; + } + auto valid_res = index->IsNotNull(); + cached_ngram_match_res_ = + std::make_shared(std::move(res_opt.value())); + cached_index_chunk_valid_res_ = std::move(valid_res); + } + + auto real_batch_size = + (current_data_chunk_pos_ + batch_size_ > active_count_) + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + result.append( + *cached_ngram_match_res_, current_data_chunk_pos_, real_batch_size); + valid_result.append(cached_index_chunk_valid_res_, + current_data_chunk_pos_, + real_batch_size); + current_data_chunk_pos_ += real_batch_size; + + return std::optional(std::make_shared( + std::move(result), std::move(valid_result))); +} + } // namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 07e398ba93..b2b81c5eed 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -18,6 +18,7 @@ #include +#include #include #include "common/EasyAssert.h" @@ -505,6 +506,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { VectorPtr ExecTextMatch(); + std::optional + ExecNgramMatch(); + std::pair SplitAtFirstSlashDigit(std::string input); diff --git a/internal/core/src/exec/expression/Utils.h b/internal/core/src/exec/expression/Utils.h index 241047ec17..3c2029c7d7 100644 --- a/internal/core/src/exec/expression/Utils.h +++ b/internal/core/src/exec/expression/Utils.h @@ -191,5 +191,122 @@ GetValueWithCastNumber(const milvus::proto::plan::GenericValue& value_proto) { } } +enum class MatchType { + ExactMatch, + PrefixMatch, + PostfixMatch, + // The different between InnerMatch and Match is that InnerMatch is used for + // %xxx% while Match could be %xxx%xxx% + InnerMatch, + Match +}; +struct ParsedResult { + std::string literal; + MatchType type; +}; + +// Not used now, but may be used in the future for other type of match for ngram index +inline std::optional +parse_ngram_pattern(const std::string& pattern) { + if (pattern.empty()) { + return std::nullopt; + } + + std::vector percent_indices; + bool was_escaped = false; + for (size_t i = 0; i < pattern.length(); ++i) { + char c = pattern[i]; + if (c == '%' && !was_escaped) { + percent_indices.push_back(i); + } else if (c == '_' && !was_escaped) { + // todo(SpadeA): now not support '_' + return std::nullopt; + } + was_escaped = (c == '\\' && !was_escaped); + } + + MatchType match_type; + size_t core_start = 0; + size_t core_length = 0; + size_t percent_count = percent_indices.size(); + + if (percent_count == 0) { + match_type = MatchType::ExactMatch; + core_start = 0; + core_length = pattern.length(); + } else if (percent_count == 1) { + if (pattern.length() == 1) { + return std::nullopt; + } + + size_t idx = percent_indices[0]; + // case: %xxx + if (idx == 0 && pattern.length() > 1) { + match_type = MatchType::PrefixMatch; + core_start = 1; + core_length = pattern.length() - 1; + } else if (idx == pattern.length() - 1 && pattern.length() > 1) { + // case: xxx% + match_type = MatchType::PostfixMatch; + core_start = 0; + core_length = pattern.length() - 1; + } else { + // case: xxx%xxx + match_type = MatchType::Match; + } + } else if (percent_count == 2) { + size_t idx1 = percent_indices[0]; + size_t idx2 = percent_indices[1]; + if (idx1 == 0 && idx2 == pattern.length() - 1 && pattern.length() > 2) { + // case: %xxx% + match_type = MatchType::InnerMatch; + core_start = 1; + core_length = pattern.length() - 2; + } else { + match_type = MatchType::Match; + } + } else { + match_type = MatchType::Match; + } + + if (match_type == MatchType::Match) { + // not supported now + return std::nullopt; + } + + // Extract the literal from the pattern + std::string_view core_pattern = + std::string_view(pattern).substr(core_start, core_length); + + std::string r; + r.reserve(2 * core_pattern.size()); + bool escape_mode = false; + for (char c : core_pattern) { + if (escape_mode) { + if (is_special(c)) { + // todo(SpadeA): may not be suitable for ngram? Not use ngram in this case for now. + return std::nullopt; + } + r += c; + escape_mode = false; + } else { + if (c == '\\') { + escape_mode = true; + } else if (c == '%') { + // should be unreachable + } else if (c == '_') { + // should be unreachable + return std::nullopt; + } else { + if (is_special(c)) { + r += '\\'; + } + r += c; + } + } + } + return std::optional{ParsedResult{std::move(r), match_type}}; +} + } // namespace exec } // namespace milvus \ No newline at end of file diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index 1982d82674..ff3d4e58be 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -28,6 +28,7 @@ #include "index/Utils.h" #include "index/Meta.h" #include "index/JsonInvertedIndex.h" +#include "index/NgramInvertedIndex.h" #include "knowhere/utils.h" #include "index/VectorDiskIndex.h" @@ -66,12 +67,27 @@ IndexFactory::CreatePrimitiveScalarIndex( return CreateScalarIndexSort(file_manager_context); } -// template <> -// inline ScalarIndexPtr -// IndexFactory::CreateScalarIndex(const IndexType& index_type) { -// return CreateBoolIndex(); -//} -// +IndexBasePtr +IndexFactory::CreateNgramIndex( + DataType data_type, + const NgramParams& params, + const storage::FileManagerContext& file_manager_context) { + switch (data_type) { + case DataType::VARCHAR: + case DataType::STRING: + return std::make_unique(file_manager_context, + params); + + case DataType::JSON: + PanicInfo( + NotImplemented, + fmt::format("building ngram index in json is not implemented")); + default: + PanicInfo(DataTypeInvalid, + fmt::format("invalid data type to build ngram index: {}", + data_type)); + } +} template <> ScalarIndexPtr @@ -345,9 +361,15 @@ IndexFactory::CreatePrimitiveScalarIndex( // create string index case DataType::STRING: - case DataType::VARCHAR: + case DataType::VARCHAR: { + auto& ngram_params = create_index_info.ngram_params; + if (ngram_params.has_value()) { + return CreateNgramIndex( + data_type, ngram_params.value(), file_manager_context); + } return CreatePrimitiveScalarIndex( create_index_info, file_manager_context); + } default: PanicInfo( DataTypeInvalid, diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index 86851c5bc9..c24cbb6e1a 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -94,6 +94,13 @@ class IndexFactory { const storage::FileManagerContext& file_manager_context = storage::FileManagerContext()); + // Create ngram index + IndexBasePtr + CreateNgramIndex(DataType data_type, + const NgramParams& params, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + // For types like array, struct, union, etc IndexBasePtr CreateCompositeScalarIndex( diff --git a/internal/core/src/index/IndexInfo.h b/internal/core/src/index/IndexInfo.h index cf0ceae96d..cc9dd37916 100644 --- a/internal/core/src/index/IndexInfo.h +++ b/internal/core/src/index/IndexInfo.h @@ -20,6 +20,12 @@ namespace milvus::index { +struct NgramParams { + bool loading_index; + uintptr_t min_gram; + uintptr_t max_gram; +}; + struct CreateIndexInfo { DataType field_type; IndexType index_type; @@ -32,6 +38,7 @@ struct CreateIndexInfo { JsonCastType json_cast_type{JsonCastType::UNKNOWN}; std::string json_path; std::string json_cast_function; + std::optional ngram_params; }; } // namespace milvus::index diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index 19a956cde9..4e2effef08 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -332,7 +332,8 @@ JsonKeyStatsInvertedIndex::Upload(const Config& config) { index_build_timestamps_.index_build_done_ = std::chrono::system_clock::now(); LOG_INFO( - "build json key index done for field id:{}, json parse duration: {}s, " + "index build done for json key index, field id:{}, json parse " + "duration: {}s, " "tantivy document add schedule duration : {}s, " "tantivy total duration : {}s, " "total duration : {}s", diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index 8010a1c024..c79405c3ae 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -52,6 +52,9 @@ constexpr const char* TANTIVY_INDEX_VERSION = "tantivy_index_version"; constexpr uint32_t TANTIVY_INDEX_LATEST_VERSION = 7; constexpr uint32_t TANTIVY_INDEX_MINIMUM_VERSION = 5; constexpr const char* INDEX_NON_ENCODING = "index.nonEncoding"; +constexpr const char* NGRAM_INDEX_TYPE = "NGRAM"; +constexpr const char* MIN_GRAM = "min_gram"; +constexpr const char* MAX_GRAM = "max_gram"; // index meta constexpr const char* COLLECTION_ID = "collection_id"; diff --git a/internal/core/src/index/NgramInvertedIndex.cpp b/internal/core/src/index/NgramInvertedIndex.cpp new file mode 100644 index 0000000000..c1d1d51f3b --- /dev/null +++ b/internal/core/src/index/NgramInvertedIndex.cpp @@ -0,0 +1,153 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "index/NgramInvertedIndex.h" +#include "exec/expression/Expr.h" + +namespace milvus::index { +NgramInvertedIndex::NgramInvertedIndex(const storage::FileManagerContext& ctx, + const NgramParams& params) + : min_gram_(params.min_gram), max_gram_(params.max_gram) { + schema_ = ctx.fieldDataMeta.field_schema; + field_id_ = ctx.fieldDataMeta.field_id; + mem_file_manager_ = std::make_shared(ctx); + disk_file_manager_ = std::make_shared(ctx); + + if (params.loading_index) { + path_ = disk_file_manager_->GetLocalNgramIndexPrefix(); + } else { + path_ = disk_file_manager_->GetLocalTempNgramIndexPrefix(); + boost::filesystem::create_directories(path_); + d_type_ = TantivyDataType::Keyword; + std::string field_name = + std::to_string(disk_file_manager_->GetFieldDataMeta().field_id); + wrapper_ = std::make_shared( + field_name.c_str(), path_.c_str(), min_gram_, max_gram_); + } +} + +void +NgramInvertedIndex::BuildWithFieldData(const std::vector& datas) { + AssertInfo(schema_.data_type() == proto::schema::DataType::String || + schema_.data_type() == proto::schema::DataType::VarChar, + "schema data type is {}", + schema_.data_type()); + index_build_begin_ = std::chrono::system_clock::now(); + InvertedIndexTantivy::BuildWithFieldData(datas); +} + +IndexStatsPtr +NgramInvertedIndex::Upload(const Config& config) { + finish(); + auto index_build_end = std::chrono::system_clock::now(); + auto index_build_duration = + std::chrono::duration(index_build_end - index_build_begin_) + .count(); + LOG_INFO("index build done for ngram index, field id: {}, duration: {}s", + field_id_, + index_build_duration); + return InvertedIndexTantivy::Upload(config); +} + +void +NgramInvertedIndex::Load(milvus::tracer::TraceContext ctx, + const Config& config) { + auto index_files = + GetValueFromConfig>(config, INDEX_FILES); + AssertInfo(index_files.has_value(), + "index file paths is empty when load ngram index"); + + auto files_value = index_files.value(); + auto it = std::find_if( + files_value.begin(), files_value.end(), [](const std::string& file) { + constexpr std::string_view suffix{"/index_null_offset"}; + return file.size() >= suffix.size() && + std::equal(suffix.rbegin(), suffix.rend(), file.rbegin()); + }); + if (it != files_value.end()) { + std::vector file; + file.push_back(*it); + files_value.erase(it); + auto index_datas = mem_file_manager_->LoadIndexToMemory( + file, config[milvus::LOAD_PRIORITY]); + BinarySet binary_set; + AssembleIndexDatas(index_datas, binary_set); + auto index_valid_data = binary_set.GetByName("index_null_offset"); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t)); + memcpy(null_offset_.data(), + index_valid_data->data.get(), + (size_t)index_valid_data->size); + } + + disk_file_manager_->CacheNgramIndexToDisk(files_value, + config[milvus::LOAD_PRIORITY]); + AssertInfo( + tantivy_index_exist(path_.c_str()), "index not exist: {}", path_); + auto load_in_mmap = + GetValueFromConfig(config, ENABLE_MMAP).value_or(true); + wrapper_ = std::make_shared( + path_.c_str(), load_in_mmap, milvus::index::SetBitsetSealed); + + if (!load_in_mmap) { + // the index is loaded in ram, so we can remove files in advance + disk_file_manager_->RemoveNgramIndexFiles(); + } + + LOG_INFO( + "load ngram index done for field id:{} with dir:{}", field_id_, path_); +} + +std::optional +NgramInvertedIndex::InnerMatchQuery(const std::string& literal, + exec::SegmentExpr* segment) { + if (literal.length() < min_gram_) { + return std::nullopt; + } + + TargetBitmap bitset{static_cast(Count())}; + wrapper_->inner_match_ngram(literal, min_gram_, max_gram_, &bitset); + + // Post filtering: if the literal length is larger than the max_gram + // we need to filter out the bitset + if (literal.length() > max_gram_) { + auto bitset_off = 0; + TargetBitmapView res(bitset); + TargetBitmap valid(res.size(), true); + TargetBitmapView valid_res(valid.data(), valid.size()); + + auto execute_sub_batch = [&literal](const std::string_view* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res) { + auto next_off_option = res.find_first(); + while (next_off_option.has_value()) { + auto next_off = next_off_option.value(); + if (next_off >= size) { + break; + } + if (data[next_off].find(literal) == std::string::npos) { + res[next_off] = false; + } + next_off_option = res.find_next(next_off); + } + }; + + segment->ProcessAllDataChunk( + execute_sub_batch, std::nullptr_t{}, res, valid_res); + } + + return std::optional(std::move(bitset)); +} + +} // namespace milvus::index diff --git a/internal/core/src/index/NgramInvertedIndex.h b/internal/core/src/index/NgramInvertedIndex.h new file mode 100644 index 0000000000..a569a6eb3b --- /dev/null +++ b/internal/core/src/index/NgramInvertedIndex.h @@ -0,0 +1,47 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include +#include + +#include "index/InvertedIndexTantivy.h" + +namespace milvus::exec { +class SegmentExpr; +} // namespace milvus::exec + +namespace milvus::index { +class NgramInvertedIndex : public InvertedIndexTantivy { + public: + explicit NgramInvertedIndex(const storage::FileManagerContext& ctx, + const NgramParams& params); + + IndexStatsPtr + Upload(const Config& config = {}) override; + + void + Load(milvus::tracer::TraceContext ctx, const Config& config) override; + + void + BuildWithFieldData(const std::vector& datas) override; + + std::optional + InnerMatchQuery(const std::string& literal, exec::SegmentExpr* segment); + + private: + uintptr_t min_gram_{0}; + uintptr_t max_gram_{0}; + int64_t field_id_{0}; + std::chrono::time_point index_build_begin_; +}; +} // namespace milvus::index \ No newline at end of file diff --git a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp index 1c44b6aa0d..dfc57369a1 100644 --- a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp +++ b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp @@ -31,8 +31,29 @@ ScalarIndexCreator::ScalarIndexCreator( const storage::FileManagerContext& file_manager_context) : config_(config), dtype_(dtype) { milvus::index::CreateIndexInfo index_info; - if (config.contains("index_type")) { - index_type_ = config.at("index_type").get(); + if (config.contains(milvus::index::INDEX_TYPE)) { + index_type_ = config.at(milvus::index::INDEX_TYPE).get(); + + if (index_type_ == milvus::index::NGRAM_INDEX_TYPE) { + if (!config.contains(milvus::index::MIN_GRAM) || + !config.contains(milvus::index::MAX_GRAM)) { + PanicInfo( + milvus::ErrorCode::InvalidParameter, + "Ngram index must specify both min_gram and max_gram"); + } + + milvus::index::NgramParams ngram_params{}; + ngram_params.loading_index = false; + ngram_params.min_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MIN_GRAM) + .value()); + ngram_params.max_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MAX_GRAM) + .value()); + index_info.ngram_params = std::make_optional(ngram_params); + } } // Config should have value for milvus::index::SCALAR_INDEX_ENGINE_VERSION for production calling chain. // Use value_or(1) for unit test without setting this value diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 1b8f5242f7..3180aabb13 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -181,8 +181,15 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { return; } - scalar_indexings_[field_id] = - std::move(const_cast(info).cache_index); + if (auto it = info.index_params.find(index::INDEX_TYPE); + it != info.index_params.end() && + it->second == index::NGRAM_INDEX_TYPE) { + ngram_indexings_[field_id] = + std::move(const_cast(info).cache_index); + } else { + scalar_indexings_[field_id] = + std::move(const_cast(info).cache_index); + } LoadResourceRequest request = milvus::index::IndexFactory::GetInstance().ScalarIndexLoadResource( @@ -598,15 +605,36 @@ ChunkedSegmentSealedImpl::chunk_view_by_offsets( PinWrapper ChunkedSegmentSealedImpl::chunk_index_impl(FieldId field_id, int64_t chunk_id) const { + std::shared_lock lck(mutex_); AssertInfo(scalar_indexings_.find(field_id) != scalar_indexings_.end(), "Cannot find scalar_indexing with field_id: " + std::to_string(field_id.get())); auto slot = scalar_indexings_.at(field_id); + lck.unlock(); + auto ca = SemiInlineGet(slot->PinCells({0})); auto index = ca->get_cell_of(0); return PinWrapper(ca, index); } +PinWrapper +ChunkedSegmentSealedImpl::GetNgramIndex(FieldId field_id) const { + std::shared_lock lck(mutex_); + auto iter = ngram_indexings_.find(field_id); + if (iter == ngram_indexings_.end()) { + return PinWrapper(nullptr); + } + auto slot = iter->second.get(); + lck.unlock(); + + auto ca = SemiInlineGet(slot->PinCells({0})); + auto index = dynamic_cast(ca->get_cell_of(0)); + AssertInfo(index != nullptr, + "ngram index cache is corrupted, field_id: {}", + field_id.get()); + return PinWrapper(ca, index); +} + int64_t ChunkedSegmentSealedImpl::get_row_count() const { std::shared_lock lck(mutex_); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 05cefba8db..f8dc4f7329 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -118,6 +118,15 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return iter->second.get(); } + bool + HasNgramIndex(FieldId field_id) const override { + std::shared_lock lck(mutex_); + return ngram_indexings_.find(field_id) != ngram_indexings_.end(); + } + + PinWrapper + GetNgramIndex(FieldId field_id) const override; + // TODO(tiered storage 1): should return a PinWrapper void BulkGetJsonData(FieldId field_id, @@ -424,6 +433,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { // TODO: generate index for scalar std::optional num_rows_; + // ngram field index + std::unordered_map ngram_indexings_; + // scalar field index std::unordered_map scalar_indexings_; // vector field index diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index ee6672ca20..2d6f85ef6f 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -535,4 +535,11 @@ SegmentInternalInterface::GetJsonKeyIndex(FieldId field_id) const { } return iter->second.get(); } + +// Only sealed segment has ngram index +PinWrapper +SegmentInternalInterface::GetNgramIndex(FieldId field_id) const { + return PinWrapper(nullptr); +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 131edf4183..1bb0e5a8fa 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -40,6 +40,7 @@ #include "index/JsonKeyStatsInvertedIndex.h" #include "segcore/ConcurrentVector.h" #include "segcore/InsertRecord.h" +#include "index/NgramInvertedIndex.h" namespace milvus::segcore { @@ -150,6 +151,9 @@ class SegmentInterface { const int64_t* offsets, int64_t count) const = 0; + virtual PinWrapper + GetNgramIndex(FieldId field_id) const = 0; + virtual void LazyCheckSchema(SchemaPtr sch) = 0; @@ -361,6 +365,9 @@ class SegmentInternalInterface : public SegmentInterface { virtual index::JsonKeyStatsInvertedIndex* GetJsonKeyIndex(FieldId field_id) const override; + virtual PinWrapper + GetNgramIndex(FieldId field_id) const override; + public: virtual void vector_search(SearchInfo& search_info, diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 09c84e26c9..fd43b2a041 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -26,6 +26,7 @@ #include "segcore/InsertRecord.h" #include "segcore/SegmentInterface.h" #include "segcore/Types.h" +#include "index/NgramInvertedIndex.h" namespace milvus::segcore { @@ -103,6 +104,12 @@ class SegmentSealed : public SegmentInternalInterface { FieldId field_id, std::unique_ptr index) = 0; + virtual bool + HasNgramIndex(FieldId field_id) const = 0; + + virtual PinWrapper + GetNgramIndex(FieldId field_id) const override = 0; + SegmentType type() const override { return SegmentType::Sealed; diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index aa4cb265ce..ea563ae6a2 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -348,6 +348,28 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { index_info.metric_type = index_params.at("metric_type"); } + if (index_info.index_type == milvus::index::NGRAM_INDEX_TYPE) { + AssertInfo(index_params.find(milvus::index::MIN_GRAM) != + index_params.end(), + "min_gram is empty for ngram index"); + AssertInfo(index_params.find(milvus::index::MAX_GRAM) != + index_params.end(), + "max_gram is empty for ngram index"); + + // get min_gram and max_gram and convert to uintptr_t + milvus::index::NgramParams ngram_params{}; + ngram_params.loading_index = true; + ngram_params.min_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MIN_GRAM) + .value()); + ngram_params.max_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MAX_GRAM) + .value()); + index_info.ngram_params = std::make_optional(ngram_params); + } + // init file manager milvus::storage::FieldDataMeta field_meta{ load_index_info->collection_id, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index ae685e12e7..c020370769 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -58,6 +58,7 @@ DiskFileManagerImpl::~DiskFileManagerImpl() { RemoveIndexFiles(); RemoveTextLogFiles(); RemoveJsonKeyIndexFiles(); + RemoveNgramIndexFiles(); } bool @@ -317,6 +318,16 @@ DiskFileManagerImpl::CacheJsonKeyIndexToDisk( priority); } +void +DiskFileManagerImpl::CacheNgramIndexToDisk( + const std::vector& remote_files, + milvus::proto::common::LoadPriority priority) { + return CacheIndexToDiskInternal( + remote_files, + [this]() { return GetLocalNgramIndexPrefix(); }, + priority); +} + template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config) { @@ -527,6 +538,13 @@ DiskFileManagerImpl::RemoveJsonKeyIndexFiles() { local_chunk_manager->RemoveDir(GetLocalJsonKeyIndexPrefix()); } +void +DiskFileManagerImpl::RemoveNgramIndexFiles() { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + local_chunk_manager->RemoveDir(GetLocalNgramIndexPrefix()); +} + template bool WriteOptFieldIvfDataImpl( @@ -803,6 +821,30 @@ DiskFileManagerImpl::GetRemoteJsonKeyLogPrefix() { field_meta_.field_id); } +std::string +DiskFileManagerImpl::GetLocalNgramIndexPrefix() { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + return GenNgramIndexPrefix(local_chunk_manager, + index_meta_.build_id, + index_meta_.index_version, + field_meta_.segment_id, + field_meta_.field_id, + false); +} + +std::string +DiskFileManagerImpl::GetLocalTempNgramIndexPrefix() { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + return GenNgramIndexPrefix(local_chunk_manager, + index_meta_.build_id, + index_meta_.index_version, + field_meta_.segment_id, + field_meta_.field_id, + true); +} + std::string DiskFileManagerImpl::GetLocalRawDataObjectPrefix() { auto local_chunk_manager = diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 754ab206b8..51b1244460 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -88,6 +88,14 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string GetRemoteJsonKeyLogPrefix(); + // Used for upload index to remote storage, using this index prefix dir as remote storage directory + std::string + GetLocalNgramIndexPrefix(); + + // Used for loading index, using this index prefix dir to store index. + std::string + GetLocalTempNgramIndexPrefix(); + std::string GetLocalRawDataObjectPrefix(); @@ -113,6 +121,10 @@ class DiskFileManagerImpl : public FileManagerImpl { CacheJsonKeyIndexToDisk(const std::vector& remote_files, milvus::proto::common::LoadPriority priority); + void + CacheNgramIndexToDisk(const std::vector& remote_files, + milvus::proto::common::LoadPriority priority); + void RemoveIndexFiles(); @@ -122,6 +134,9 @@ class DiskFileManagerImpl : public FileManagerImpl { void RemoveJsonKeyIndexFiles(); + void + RemoveNgramIndexFiles(); + void AddBatchIndexFiles(const std::string& local_file_name, const std::vector& local_file_offsets, diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 6acd678b1e..2704d8a6c5 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -653,6 +653,26 @@ GenRemoteJsonKeyIndexPathPrefix(ChunkManagerPtr cm, segment_id, field_id); } + +std::string +GenNgramIndexPrefix(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id, + bool is_temp) { + boost::filesystem::path prefix = cm->GetRootPath(); + + if (is_temp) { + prefix = prefix / TEMP; + } + + boost::filesystem::path path = std::string(NGRAM_LOG_ROOT_PATH); + boost::filesystem::path path1 = + GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); + return (prefix / path / path1).string(); +} + std::string GenFieldRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id, diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index 5a2ebb773a..b569a73b03 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -133,6 +133,14 @@ GenRemoteJsonKeyIndexPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id); std::string +GenNgramIndexPrefix(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id, + bool is_temp); + +std::string GenFieldRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id); diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 7732587af9..648cd61951 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -113,6 +113,13 @@ RustResult tantivy_create_json_key_stats_writer(const char *field_name, uintptr_t overall_memory_budget_in_bytes, bool in_ram); +RustResult tantivy_create_ngram_writer(const char *field_name, + const char *path, + uintptr_t min_gram, + uintptr_t max_gram, + uintptr_t num_threads, + uintptr_t overall_memory_budget_in_bytes); + RustResult tantivy_load_index(const char *path, bool load_in_mmap, SetBitsetFn set_bitset); void tantivy_free_index_reader(void *ptr); @@ -182,6 +189,12 @@ RustResult tantivy_term_query_keyword(void *ptr, const char *term, void *bitset) RustResult tantivy_term_query_keyword_i64(void *ptr, const char *term); +RustResult tantivy_inner_match_ngram(void *ptr, + const char *literal, + uintptr_t min_gram, + uintptr_t max_gram, + void *bitset); + RustResult tantivy_lower_bound_range_query_keyword(void *ptr, const char *lower_bound, bool inclusive, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer.rs new file mode 100644 index 0000000000..2c93e474fb --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions}; +use tantivy::tokenizer::{NgramTokenizer, TextAnalyzer}; +use tantivy::Index; + +use crate::error::Result; +use crate::index_writer::IndexWriterWrapper; +use crate::index_writer_v7::IndexWriterWrapperImpl; + +const NGRAM_TOKENIZER: &str = "ngram"; + +fn build_ngram_schema(field_name: &str) -> (Schema, Field) { + let mut schema_builder = Schema::builder(); + + let text_field_indexing = TextFieldIndexing::default() + .set_tokenizer(NGRAM_TOKENIZER) + .set_fieldnorms(false) + .set_index_option(IndexRecordOption::Basic); + let text_options = TextOptions::default().set_indexing_options(text_field_indexing); + let field = schema_builder.add_text_field(field_name, text_options); + schema_builder.enable_user_specified_doc_id(); + (schema_builder.build(), field) +} + +impl IndexWriterWrapper { + // create a text writer according to `tanviy_index_version`. + // version 7 is the latest version and is what we should use in most cases. + // We may also build with version 5 for compatibility for reader nodes with older versions. + pub(crate) fn create_ngram_writer( + field_name: &str, + path: &str, + min_gram: usize, + max_gram: usize, + num_threads: usize, + overall_memory_budget_in_bytes: usize, + ) -> Result { + let tokenizer = TextAnalyzer::builder(NgramTokenizer::new( + min_gram as usize, + max_gram as usize, + false, + )?) + .dynamic() + .build(); + + let (schema, field) = build_ngram_schema(field_name); + + let index = Index::create_in_dir(path, schema).unwrap(); + index.tokenizers().register(NGRAM_TOKENIZER, tokenizer); + let index_writer = index + .writer_with_num_threads(num_threads, overall_memory_budget_in_bytes) + .unwrap(); + + Ok(IndexWriterWrapper::V7(IndexWriterWrapperImpl { + field, + index_writer, + index: Arc::new(index), + enable_user_specified_doc_id: true, + id_field: None, + })) + } +} + +#[cfg(test)] +mod tests { + use std::ffi::c_void; + + use tempfile::TempDir; + + use crate::{index_writer::IndexWriterWrapper, util::set_bitset}; + + #[test] + fn test_create_ngram_writer() { + let dir = TempDir::new().unwrap(); + let _ = IndexWriterWrapper::create_ngram_writer( + "test", + dir.path().to_str().unwrap(), + 1, + 2, + 1, + 15000000, + ) + .unwrap(); + } + + #[test] + fn test_ngram_writer() { + let dir = TempDir::new().unwrap(); + let mut writer = IndexWriterWrapper::create_ngram_writer( + "test", + dir.path().to_str().unwrap(), + 2, + 3, + 1, + 15000000, + ) + .unwrap(); + + writer.add("university", Some(0)).unwrap(); + writer.add("anthropology", Some(1)).unwrap(); + writer.add("economics", Some(2)).unwrap(); + writer.add("history", Some(3)).unwrap(); + writer.add("victoria", Some(4)).unwrap(); + writer.add("basics", Some(5)).unwrap(); + writer.add("economiCs", Some(6)).unwrap(); + + writer.commit().unwrap(); + + let reader = writer.create_reader(set_bitset).unwrap(); + let mut res: Vec = vec![]; + reader + .inner_match_ngram("ic", 2, 3, &mut res as *mut _ as *mut c_void) + .unwrap(); + assert_eq!(res, vec![2, 4, 5]); + } + + #[test] + fn test_ngram_writer_chinese() { + let dir = TempDir::new().unwrap(); + let mut writer = IndexWriterWrapper::create_ngram_writer( + "test", + dir.path().to_str().unwrap(), + 2, + 3, + 1, + 15000000, + ) + .unwrap(); + + writer.add("ngram测试", Some(0)).unwrap(); + writer.add("测试ngram", Some(1)).unwrap(); + writer.add("测试ngram测试", Some(2)).unwrap(); + writer.add("你好世界", Some(3)).unwrap(); + writer.add("ngram需要被测试", Some(4)).unwrap(); + + writer.commit().unwrap(); + + let reader = writer.create_reader(set_bitset).unwrap(); + let mut res: Vec = vec![]; + reader + .inner_match_ngram("测试", 2, 3, &mut res as *mut _ as *mut c_void) + .unwrap(); + assert_eq!(res, vec![0, 1, 2, 4]); + + let mut res: Vec = vec![]; + reader + .inner_match_ngram("m测试", 2, 3, &mut res as *mut _ as *mut c_void) + .unwrap(); + assert_eq!(res, vec![0, 2]); + + let mut res: Vec = vec![]; + reader + .inner_match_ngram("需要被测试", 2, 3, &mut res as *mut _ as *mut c_void) + .unwrap(); + assert_eq!(res, vec![4]); + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer_c.rs new file mode 100644 index 0000000000..960a35b8f1 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_ngram_writer_c.rs @@ -0,0 +1,37 @@ +use std::ffi::c_char; +use std::ffi::CStr; + +use crate::array::RustResult; +use crate::cstr_to_str; +use crate::index_writer::IndexWriterWrapper; +use crate::log::init_log; +use crate::util::create_binding; + +#[no_mangle] +pub extern "C" fn tantivy_create_ngram_writer( + field_name: *const c_char, + path: *const c_char, + min_gram: usize, + max_gram: usize, + num_threads: usize, + overall_memory_budget_in_bytes: usize, +) -> RustResult { + init_log(); + let field_name_str = cstr_to_str!(field_name); + let path_str = cstr_to_str!(path); + + match IndexWriterWrapper::create_ngram_writer( + field_name_str, + path_str, + min_gram, + max_gram, + num_threads, + overall_memory_budget_in_bytes, + ) { + Ok(index_writer_wrapper) => RustResult::from_ptr(create_binding(index_writer_wrapper)), + Err(err) => RustResult::from_error(format!( + "create ngram writer failed with error: {}", + err.to_string(), + )), + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs index 3a8c16a1f2..b38a38423d 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs @@ -1,10 +1,12 @@ +use log::info; use std::ffi::c_void; use std::ops::Bound; use std::sync::Arc; use tantivy::fastfield::FastValue; -use tantivy::query::{ExistsQuery, Query, RangeQuery, RegexQuery, TermQuery}; +use tantivy::query::{BooleanQuery, ExistsQuery, Query, RangeQuery, RegexQuery, TermQuery}; use tantivy::schema::{Field, IndexRecordOption}; +use tantivy::tokenizer::{NgramTokenizer, TokenStream, Tokenizer}; use tantivy::{Index, IndexReader, ReloadPolicy, Term}; use crate::bitset_wrapper::BitsetWrapper; @@ -297,6 +299,41 @@ impl IndexReaderWrapper { self.search_i64(&q) } + // **Note**: literal length must be larger or equal to min_gram. + pub fn inner_match_ngram( + &self, + literal: &str, + min_gram: usize, + max_gram: usize, + bitset: *mut c_void, + ) -> Result<()> { + // literal length should be larger or equal to min_gram. + assert!( + literal.chars().count() >= min_gram, + "literal length should be larger or equal to min_gram. literal: {}, min_gram: {}", + literal, + min_gram + ); + + if literal.chars().count() <= max_gram { + return self.term_query_keyword(literal, bitset); + } + + let mut terms = vec![]; + // So, str length is larger than 'max_gram' parse 'str' by 'max_gram'-gram and search all of them with boolean intersection + // nivers + let mut term_queries: Vec> = vec![]; + let mut tokenizer = NgramTokenizer::new(max_gram, max_gram, false).unwrap(); + let mut token_stream = tokenizer.token_stream(literal); + token_stream.process(&mut |token| { + let term = Term::from_field_text(self.field, &token.text); + term_queries.push(Box::new(TermQuery::new(term, IndexRecordOption::Basic))); + terms.push(token.text.clone()); + }); + let query = BooleanQuery::intersection(term_queries); + self.search(&query, bitset) + } + pub fn lower_bound_range_query_keyword( &self, lower_bound: &str, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs index 1119e42626..f8c15bd4ab 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs @@ -233,6 +233,25 @@ pub extern "C" fn tantivy_term_query_keyword_i64( unsafe { (*real).term_query_keyword_i64(term).into() } } +#[no_mangle] +pub extern "C" fn tantivy_inner_match_ngram( + ptr: *mut c_void, + literal: *const c_char, + min_gram: usize, + max_gram: usize, + bitset: *mut c_void, +) -> RustResult { + let real = ptr as *mut IndexReaderWrapper; + let literal = cstr_to_str!(literal); + + let now = std::time::Instant::now(); + unsafe { + (*real) + .inner_match_ngram(literal, min_gram, max_gram, bitset) + .into() + } +} + #[no_mangle] pub extern "C" fn tantivy_lower_bound_range_query_keyword( ptr: *mut c_void, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs index 670021bc80..13e9494225 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs @@ -9,6 +9,8 @@ mod error; mod hashmap_c; mod index_json_key_stats_writer; mod index_json_key_stats_writer_c; +mod index_ngram_writer; +mod index_ngram_writer_c; mod index_reader; mod index_reader_c; mod index_reader_text; diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index 78716946f1..17a2565268 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -177,6 +177,29 @@ struct TantivyIndexWrapper { path_ = std::string(path); } + // create index writer for ngram + TantivyIndexWrapper(const char* field_name, + const char* path, + uintptr_t min_gram, + uintptr_t max_gram, + uintptr_t num_threads = DEFAULT_NUM_THREADS, + uintptr_t overall_memory_budget_in_bytes = + DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) { + auto res = RustResultWrapper( + tantivy_create_ngram_writer(field_name, + path, + min_gram, + max_gram, + num_threads, + overall_memory_budget_in_bytes)); + + AssertInfo(res.result_->success, + "failed to create ngram writer: {}", + res.result_->error); + writer_ = res.result_->value.ptr._0; + path_ = std::string(path); + } + // create reader. void create_reader(SetBitsetFn set_bitset) { @@ -912,6 +935,22 @@ struct TantivyIndexWrapper { "TantivyIndexWrapper.phrase_match_query: invalid result type"); } + void + inner_match_ngram(const std::string& literal, + uintptr_t min_gram, + uintptr_t max_gram, + void* bitset) { + auto array = tantivy_inner_match_ngram( + reader_, literal.c_str(), min_gram, max_gram, bitset); + auto res = RustResultWrapper(array); + AssertInfo(res.result_->success, + "TantivyIndexWrapper.inner_match_ngram: {}", + res.result_->error); + AssertInfo( + res.result_->value.tag == Value::Tag::None, + "TantivyIndexWrapper.inner_match_ngram: invalid result type"); + } + // json query template void diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 19b1111777..5955f1feb6 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -106,6 +106,7 @@ set(MILVUS_TEST_FILES test_thread_pool.cpp test_json_flat_index.cpp test_vector_array.cpp + test_ngram_query.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_ngram_query.cpp b/internal/core/unittest/test_ngram_query.cpp new file mode 100644 index 0000000000..7d745da7c6 --- /dev/null +++ b/internal/core/unittest/test_ngram_query.cpp @@ -0,0 +1,377 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include + +#include "common/Schema.h" +#include "test_utils/GenExprProto.h" +#include "query/PlanProto.h" +#include "query/ExecPlanNodeVisitor.h" +#include "expr/ITypeExpr.h" +#include "test_utils/storage_test_utils.h" +#include "index/IndexFactory.h" +#include "index/NgramInvertedIndex.h" +#include "segcore/load_index_c.h" + +using namespace milvus; +using namespace milvus::query; +using namespace milvus::segcore; +using namespace milvus::exec; + +TEST(ConvertToNgramLiteralTest, EmptyString) { + auto result = parse_ngram_pattern(""); + ASSERT_FALSE(result.has_value()); +} + +TEST(ConvertToNgramLiteralTest, ExactMatchSimple) { + auto result = parse_ngram_pattern("abc"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "abc"); + EXPECT_EQ(result->type, MatchType::ExactMatch); +} + +TEST(ConvertToNgramLiteralTest, ExactMatchWithEscapedPercent) { + auto result = parse_ngram_pattern("ab\\%cd"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "ab%cd"); + EXPECT_EQ(result->type, MatchType::ExactMatch); +} + +TEST(ConvertToNgramLiteralTest, ExactMatchWithEscapedSpecialChar) { + auto result = parse_ngram_pattern("a.b"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "a\\.b"); + EXPECT_EQ(result->type, MatchType::ExactMatch); +} + +TEST(ConvertToNgramLiteralTest, PrefixMatchSimple) { + auto result = parse_ngram_pattern("%abc"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "abc"); + EXPECT_EQ(result->type, MatchType::PrefixMatch); +} + +TEST(ConvertToNgramLiteralTest, PostfixMatchSimple) { + auto result = parse_ngram_pattern("abc%"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "abc"); + EXPECT_EQ(result->type, MatchType::PostfixMatch); +} + +TEST(ConvertToNgramLiteralTest, InnerMatchSimple) { + auto result = parse_ngram_pattern("%abc%"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "abc"); + EXPECT_EQ(result->type, MatchType::InnerMatch); +} + +TEST(ConvertToNgramLiteralTest, MatchSinglePercentMiddle) { + auto result = parse_ngram_pattern("a%b"); + ASSERT_FALSE(result.has_value()); +} + +TEST(ConvertToNgramLiteralTest, MatchTypeReturnsNullopt) { + EXPECT_FALSE(parse_ngram_pattern("%").has_value()); + // %a%b (n=2, not %xxx%) -> Match -> nullopt + EXPECT_FALSE(parse_ngram_pattern("%a%b").has_value()); + // a%b%c (n=2, not %xxx%) -> Match -> nullopt + EXPECT_FALSE(parse_ngram_pattern("a%b%c").has_value()); + // %% (n=2, not %xxx% because length is not > 2) -> Match -> nullopt + EXPECT_FALSE(parse_ngram_pattern("%%").has_value()); + // %a%b%c% (n=3) -> Match -> nullopt + EXPECT_FALSE(parse_ngram_pattern("%a%b%c%").has_value()); +} + +TEST(ConvertToNgramLiteralTest, UnescapedUnderscoreReturnsNullopt) { + EXPECT_FALSE(parse_ngram_pattern("a_b").has_value()); + EXPECT_FALSE(parse_ngram_pattern("%a_b").has_value()); + EXPECT_FALSE(parse_ngram_pattern("a_b%").has_value()); + EXPECT_FALSE(parse_ngram_pattern("%a_b%").has_value()); +} + +TEST(ConvertToNgramLiteralTest, EscapedUnderscore) { + auto result = parse_ngram_pattern("a\\_b"); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result->literal, "a_b"); + EXPECT_EQ(result->type, MatchType::ExactMatch); +} + +auto +generate_field_meta(int64_t collection_id = 1, + int64_t partition_id = 2, + int64_t segment_id = 3, + int64_t field_id = 101, + DataType data_type = DataType::NONE, + DataType element_type = DataType::NONE, + bool nullable = false) -> storage::FieldDataMeta { + auto meta = storage::FieldDataMeta{ + .collection_id = collection_id, + .partition_id = partition_id, + .segment_id = segment_id, + .field_id = field_id, + }; + meta.field_schema.set_data_type( + static_cast(data_type)); + meta.field_schema.set_element_type( + static_cast(element_type)); + meta.field_schema.set_nullable(nullable); + return meta; +} + +auto +generate_index_meta(int64_t segment_id = 3, + int64_t field_id = 101, + int64_t index_build_id = 1000, + int64_t index_version = 10000) -> storage::IndexMeta { + return storage::IndexMeta{ + .segment_id = segment_id, + .field_id = field_id, + .build_id = index_build_id, + .index_version = index_version, + }; +} + +auto +generate_local_storage_config(const std::string& root_path) + -> storage::StorageConfig { + auto ret = storage::StorageConfig{}; + ret.storage_type = "local"; + ret.root_path = root_path; + return ret; +} + +void +test_ngram_with_data(const boost::container::vector& data, + const std::string& literal, + const std::vector& expected_result) { + int64_t collection_id = 1; + int64_t partition_id = 2; + int64_t segment_id = 3; + int64_t index_build_id = 4000; + int64_t index_version = 4000; + int64_t index_id = 5000; + + auto schema = std::make_shared(); + auto field_id = schema->AddDebugField("ngram", DataType::VARCHAR); + + auto field_meta = generate_field_meta(collection_id, + partition_id, + segment_id, + field_id.get(), + DataType::VARCHAR, + DataType::NONE, + false); + auto index_meta = generate_index_meta( + segment_id, field_id.get(), index_build_id, index_version); + + std::string root_path = "/tmp/test-inverted-index/"; + auto storage_config = generate_local_storage_config(root_path); + auto cm = CreateChunkManager(storage_config); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(1, 100); + + size_t nb = data.size(); + + auto field_data = storage::CreateFieldData(DataType::VARCHAR, false); + field_data->FillFieldData(data.data(), data.size()); + + auto segment = CreateSealedSegment(schema); + auto field_data_info = PrepareSingleFieldInsertBinlog(collection_id, + partition_id, + segment_id, + field_id.get(), + {field_data}, + cm); + segment->LoadFieldData(field_data_info); + + auto payload_reader = + std::make_shared(field_data); + storage::InsertData insert_data(payload_reader); + insert_data.SetFieldDataMeta(field_meta); + insert_data.SetTimestamps(0, 100); + + auto serialized_bytes = insert_data.Serialize(storage::Remote); + + auto get_binlog_path = [=](int64_t log_id) { + return fmt::format("{}/{}/{}/{}/{}", + collection_id, + partition_id, + segment_id, + field_id.get(), + log_id); + }; + + auto log_path = get_binlog_path(0); + + auto cm_w = ChunkManagerWrapper(cm); + cm_w.Write(log_path, serialized_bytes.data(), serialized_bytes.size()); + + storage::FileManagerContext ctx(field_meta, index_meta, cm); + std::vector index_files; + + { + Config config; + config["index_type"] = milvus::index::INVERTED_INDEX_TYPE; + config["insert_files"] = std::vector{log_path}; + + auto ngram_params = index::NgramParams{ + .loading_index = false, + .min_gram = 2, + .max_gram = 4, + }; + auto index = + std::make_shared(ctx, ngram_params); + index->Build(config); + + auto create_index_result = index->Upload(); + auto memSize = create_index_result->GetMemSize(); + auto serializedSize = create_index_result->GetSerializedSize(); + ASSERT_GT(memSize, 0); + ASSERT_GT(serializedSize, 0); + index_files = create_index_result->GetIndexFiles(); + } + + { + index::CreateIndexInfo index_info{}; + index_info.index_type = milvus::index::INVERTED_INDEX_TYPE; + index_info.field_type = DataType::VARCHAR; + + Config config; + config[milvus::index::INDEX_FILES] = index_files; + config[milvus::LOAD_PRIORITY] = + milvus::proto::common::LoadPriority::HIGH; + + auto ngram_params = index::NgramParams{ + .loading_index = true, + .min_gram = 2, + .max_gram = 4, + }; + auto index = + std::make_unique(ctx, ngram_params); + index->Load(milvus::tracer::TraceContext{}, config); + + auto cnt = index->Count(); + ASSERT_EQ(cnt, nb); + + exec::SegmentExpr segment_expr(std::move(std::vector{}), + "SegmentExpr", + segment.get(), + field_id, + {}, + DataType::VARCHAR, + nb, + 8192, + 0); + + auto bitset = index->InnerMatchQuery(literal, &segment_expr).value(); + for (size_t i = 0; i < nb; i++) { + ASSERT_EQ(bitset[i], expected_result[i]); + } + } + + { + std::map index_params{ + {milvus::index::INDEX_TYPE, milvus::index::NGRAM_INDEX_TYPE}, + {milvus::index::MIN_GRAM, "2"}, + {milvus::index::MAX_GRAM, "4"}, + {milvus::LOAD_PRIORITY, "HIGH"}, + }; + milvus::segcore::LoadIndexInfo load_index_info{ + .collection_id = collection_id, + .partition_id = partition_id, + .segment_id = segment_id, + .field_id = field_id.get(), + .field_type = DataType::VARCHAR, + .enable_mmap = true, + .mmap_dir_path = "/tmp/test-ngram-index-mmap-dir", + .index_id = index_id, + .index_build_id = index_build_id, + .index_version = index_version, + .index_params = index_params, + .index_files = index_files, + .schema = field_meta.field_schema, + .index_size = 1024 * 1024 * 1024, + }; + + uint8_t trace_id[16] = {0}; + uint8_t span_id[8] = {0}; + trace_id[0] = 1; + span_id[0] = 2; + CTraceContext trace{ + .traceID = trace_id, + .spanID = span_id, + .traceFlags = 0, + }; + auto cload_index_info = static_cast(&load_index_info); + AppendIndexV2(trace, cload_index_info); + UpdateSealedSegmentIndex(segment.get(), cload_index_info); + + auto unary_range_expr = + test::GenUnaryRangeExpr(OpType::InnerMatch, literal); + auto column_info = test::GenColumnInfo( + field_id.get(), proto::schema::DataType::VarChar, false, false); + unary_range_expr->set_allocated_column_info(column_info); + auto expr = test::GenExpr(); + expr->set_allocated_unary_range_expr(unary_range_expr); + auto parser = ProtoParser(schema); + auto typed_expr = parser.ParseExprs(*expr); + auto parsed = std::make_shared( + DEFAULT_PLANNODE_ID, typed_expr); + BitsetType final; + final = ExecuteQueryExpr(parsed, segment.get(), nb, MAX_TIMESTAMP); + for (size_t i = 0; i < nb; i++) { + ASSERT_EQ(final[i], expected_result[i]); + } + } +} + +TEST(NgramIndex, TestNgramWikiEpisode) { + boost::container::vector data; + // not hit + data.push_back( + "'Indira Davelba Murillo Alvarado (Tegucigalpa, " + "the youngest of eight siblings. She attended primary school at the " + "Escuela 14 de Julio, and her secondary studies at the Instituto " + "school called \"Indi del Bosque\", where she taught the children of " + "Honduran women'"); + // hit + data.push_back( + "Richmond Green Secondary School is a public secondary school in " + "Richmond Hill, Ontario, Canada."); + // hit + data.push_back( + "The Gymnasium in 2002 Gymnasium Philippinum or Philippinum High " + "School is an almost 500-year-old secondary school in Marburg, Hesse, " + "Germany."); + // hit + data.push_back( + "Sir Winston Churchill Secondary School is a Canadian secondary school " + "located in St. Catharines, Ontario."); + // not hit + data.push_back("Sir Winston Churchill Secondary School"); + + std::vector expected_result{false, true, true, true, false}; + + test_ngram_with_data(data, "secondary school", expected_result); +} + +TEST(NgramIndex, TestNgramAllFalse) { + boost::container::vector data(10000, + "elementary school secondary"); + + // all can be hit by ngram tantivy but will be filterred out by the second phase + test_ngram_with_data( + data, "secondary school", std::vector(10000, false)); +} diff --git a/internal/datacoord/stats_inspector.go b/internal/datacoord/stats_inspector.go index ab191cda2b..48666e3f1b 100644 --- a/internal/datacoord/stats_inspector.go +++ b/internal/datacoord/stats_inspector.go @@ -195,8 +195,8 @@ func (si *statsInspector) enableBM25() bool { } func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { - if !(isFlush(segment) && segment.GetLevel() != datapb.SegmentLevel_L0 && - segment.GetIsSorted()) { + if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 || + !segment.GetIsSorted() { return false } @@ -212,12 +212,15 @@ func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { } func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { - if !(isFlush(segment) && segment.GetLevel() != datapb.SegmentLevel_L0 && - segment.GetIsSorted()) { + if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 || + !segment.GetIsSorted() { return false } for _, fieldID := range fieldIDs { + if segment.GetJsonKeyStats() == nil { + return true + } if segment.GetJsonKeyStats()[fieldID] == nil { return true } diff --git a/internal/proxy/task_index_test.go b/internal/proxy/task_index_test.go index 1c4700a94e..9f16fdd597 100644 --- a/internal/proxy/task_index_test.go +++ b/internal/proxy/task_index_test.go @@ -1162,6 +1162,94 @@ func Test_parseIndexParams(t *testing.T) { }) } +func Test_ngram_parseIndexParams(t *testing.T) { + t.Run("valid ngram index params", func(t *testing.T) { + cit := &createIndexTask{ + req: &milvuspb.CreateIndexRequest{ + ExtraParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: common.IndexParamsKey, Value: "{\"min_gram\": \"2\", \"max_gram\": \"3\"}"}, + }, + }, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, Name: "FieldID", DataType: schemapb.DataType_VarChar, + }, + } + err := cit.parseIndexParams(context.TODO()) + assert.NoError(t, err) + assert.ElementsMatch(t, []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: indexparamcheck.MinGramKey, Value: "2"}, + {Key: indexparamcheck.MaxGramKey, Value: "3"}, + }, cit.newIndexParams) + assert.Empty(t, cit.newTypeParams) + }) + + t.Run("ngram on non varchar field", func(t *testing.T) { + cit := &createIndexTask{ + req: &milvuspb.CreateIndexRequest{ + ExtraParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: common.IndexParamsKey, Value: "{\"min_gram\": \"2\", \"max_gram\": \"3\"}"}, + }, + }, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, Name: "FieldInt", DataType: schemapb.DataType_Int64, + }, + } + err := cit.parseIndexParams(context.TODO()) + assert.Error(t, err) + }) + + t.Run("ngram missing params", func(t *testing.T) { + cit := &createIndexTask{ + req: &milvuspb.CreateIndexRequest{ + ExtraParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: common.IndexParamsKey, Value: "{\"min_gram\": \"2\"}"}, + }, + }, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, Name: "FieldID", DataType: schemapb.DataType_VarChar, + }, + } + err := cit.parseIndexParams(context.TODO()) + assert.Error(t, err) + }) + + t.Run("ngram non-integer params", func(t *testing.T) { + cit := &createIndexTask{ + req: &milvuspb.CreateIndexRequest{ + ExtraParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: common.IndexParamsKey, Value: "{\"min_gram\": \"a\", \"max_gram\": \"3\"}"}, + }, + }, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, Name: "FieldID", DataType: schemapb.DataType_VarChar, + }, + } + err := cit.parseIndexParams(context.TODO()) + assert.Error(t, err) + }) + + t.Run("ngram invalid range", func(t *testing.T) { + cit := &createIndexTask{ + req: &milvuspb.CreateIndexRequest{ + ExtraParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "NGRAM"}, + {Key: common.IndexParamsKey, Value: "{\"min_gram\": \"5\", \"max_gram\": \"3\"}"}, + }, + }, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, Name: "FieldID", DataType: schemapb.DataType_VarChar, + }, + } + err := cit.parseIndexParams(context.TODO()) + assert.Error(t, err) + }) +} + func Test_wrapUserIndexParams(t *testing.T) { params := wrapUserIndexParams("L2") assert.Equal(t, 2, len(params)) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 24191bf9bd..6f079e5bdd 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -796,7 +796,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu log := log.Ctx(ctx).With(zap.Int64("segmentID", segment.ID())) tr := timerecord.NewTimeRecorder("segmentLoader.loadSealedSegment") log.Info("Start loading fields...", - // zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), + zap.Int("indexedFields count", len(indexedFieldInfos)), zap.Int64s("indexed text fields", lo.Keys(textIndexes)), zap.Int64s("unindexed text fields", lo.Keys(unindexedTextFields)), zap.Int64s("indexed json key fields", lo.Keys(jsonKeyStats)), @@ -1744,6 +1744,10 @@ func (loader *segmentLoader) LoadJSONIndex(ctx context.Context, return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg)) } + if len(loadInfo.GetJsonKeyStatsLogs()) == 0 { + return nil + } + collection := segment.GetCollection() schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) diff --git a/internal/util/indexparamcheck/conf_adapter_mgr.go b/internal/util/indexparamcheck/conf_adapter_mgr.go index a746f423ce..184cdbfa8b 100644 --- a/internal/util/indexparamcheck/conf_adapter_mgr.go +++ b/internal/util/indexparamcheck/conf_adapter_mgr.go @@ -58,6 +58,7 @@ func (mgr *indexCheckerMgrImpl) registerIndexChecker() { mgr.checkers[IndexHybrid] = newHYBRIDChecker() mgr.checkers["marisa-trie"] = newTRIEChecker() mgr.checkers[AutoIndex] = newAUTOINDEXChecker() + mgr.checkers[IndexNGRAM] = newNgramIndexChecker() } func newIndexCheckerMgr() *indexCheckerMgrImpl { diff --git a/internal/util/indexparamcheck/index_type.go b/internal/util/indexparamcheck/index_type.go index 45bdbdc747..3f0a5fa640 100644 --- a/internal/util/indexparamcheck/index_type.go +++ b/internal/util/indexparamcheck/index_type.go @@ -33,6 +33,7 @@ const ( IndexBitmap IndexType = "BITMAP" IndexHybrid IndexType = "HYBRID" // BITMAP + INVERTED IndexINVERTED IndexType = "INVERTED" + IndexNGRAM IndexType = "NGRAM" AutoIndex IndexType = "AUTOINDEX" ) diff --git a/internal/util/indexparamcheck/ngram_index_checker.go b/internal/util/indexparamcheck/ngram_index_checker.go new file mode 100644 index 0000000000..86a3ad6e12 --- /dev/null +++ b/internal/util/indexparamcheck/ngram_index_checker.go @@ -0,0 +1,60 @@ +package indexparamcheck + +import ( + "fmt" + "strconv" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +const ( + MinGramKey = "min_gram" + MaxGramKey = "max_gram" +) + +type NgramIndexChecker struct { + scalarIndexChecker +} + +func newNgramIndexChecker() *NgramIndexChecker { + return &NgramIndexChecker{} +} + +func (c *NgramIndexChecker) CheckTrain(dataType schemapb.DataType, params map[string]string) error { + if dataType != schemapb.DataType_VarChar { + // todo(SpadeA): we may support it for json in the future + return merr.WrapErrParameterInvalidMsg("Ngram index can only be created on VARCHAR field") + } + + minGramStr, minGramExist := params[MinGramKey] + maxGramStr, maxGramExist := params[MaxGramKey] + if !minGramExist || !maxGramExist { + return merr.WrapErrParameterInvalidMsg("Ngram index must specify both min_gram and max_gram") + } + + minGram, err := strconv.Atoi(minGramStr) + if err != nil { + return merr.WrapErrParameterInvalidMsg("min_gram for Ngram index must be an integer, got: %s", minGramStr) + } + + maxGram, err := strconv.Atoi(maxGramStr) + if err != nil { + return merr.WrapErrParameterInvalidMsg("max_gram for Ngram index must be an integer, got: %s", maxGramStr) + } + + if minGram <= 0 || maxGram <= 0 || minGram > maxGram { + return merr.WrapErrParameterInvalidMsg("invalid min_gram or max_gram value for Ngram index, min_gram: %d, max_gram: %d", minGram, maxGram) + } + + return c.scalarIndexChecker.CheckTrain(dataType, params) +} + +func (c *NgramIndexChecker) CheckValidDataType(indexType IndexType, field *schemapb.FieldSchema) error { + dType := field.GetDataType() + if !typeutil.IsStringType(dType) { + return fmt.Errorf("ngram index can only be created on VARCHAR field") + } + return nil +}