From 5b85f0e4dc4949a3b61abd85f56d641a7642f757 Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Thu, 20 Nov 2025 01:51:07 +0800 Subject: [PATCH] enhance: updated multiple places where the expr copies the input values in every loop (#45680) issue: https://github.com/milvus-io/milvus/issues/45679 Signed-off-by: Buqian Zheng --- .../core/src/exec/expression/CompareExpr.h | 8 +-- internal/core/src/exec/expression/Element.h | 5 +- internal/core/src/exec/expression/Expr.h | 26 +++---- .../src/exec/expression/JsonContainsExpr.cpp | 68 +++++++++++++------ .../src/exec/expression/JsonContainsExpr.h | 2 + .../core/src/exec/expression/TermExpr.cpp | 13 ++-- 6 files changed, 79 insertions(+), 43 deletions(-) diff --git a/internal/core/src/exec/expression/CompareExpr.h b/internal/core/src/exec/expression/CompareExpr.h index a07b4dc4ba..f2ed751c87 100644 --- a/internal/core/src/exec/expression/CompareExpr.h +++ b/internal/core/src/exec/expression/CompareExpr.h @@ -269,7 +269,7 @@ class PhyCompareFilterExpr : public Expr { OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { if (segment_chunk_reader_.segment_->is_chunked()) { return ProcessBothDataChunksForMultipleChunksize(); int64_t processed_size = 0; const auto size_per_chunk = segment_chunk_reader_.SizePerChunk(); @@ -380,7 +380,7 @@ class PhyCompareFilterExpr : public Expr { ProcessBothDataChunksForSingleChunk(FUNC func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { int64_t processed_size = 0; const auto active_count = segment_chunk_reader_.active_count_; @@ -450,7 +450,7 @@ class PhyCompareFilterExpr : public Expr { ProcessBothDataChunksForMultipleChunk(FUNC func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { int64_t processed_size = 0; // only call this function when left and right are not indexed, so they have the same number of chunks diff --git a/internal/core/src/exec/expression/Element.h b/internal/core/src/exec/expression/Element.h index ecd23b10d0..af302d44d3 100644 --- a/internal/core/src/exec/expression/Element.h +++ b/internal/core/src/exec/expression/Element.h @@ -41,6 +41,7 @@ class SingleElement : public BaseElement { using ValueType = std::variant || std::is_same_v || + std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || @@ -95,6 +97,7 @@ class MultiElement : public BaseElement { using ValueType = std::variant(value)) { for (const auto& v : values_) { - if (v == value) + if (v == std::get(value)) return true; } } diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index e7a335ac6e..46c73ee016 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -362,7 +362,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { // For sealed segment, only single chunk Assert(num_data_chunk_ == 1); auto need_size = @@ -423,7 +423,7 @@ class SegmentExpr : public Expr { OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { // For non_chunked sealed segment, only single chunk Assert(num_data_chunk_ == 1); @@ -451,7 +451,7 @@ class SegmentExpr : public Expr { VectorPtr ProcessIndexChunksByOffsets(FUNC func, OffsetVector* input, - ValTypes... values) { + const ValTypes&... values) { AssertInfo(num_index_chunk_ == 1, "scalar index chunk num must be 1"); using IndexInnerType = std:: conditional_t, std::string, T>; @@ -480,7 +480,7 @@ class SegmentExpr : public Expr { OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { AssertInfo(num_index_chunk_ == 1, "scalar index chunk num must be 1"); auto& skip_index = segment_->GetSkipIndex(); @@ -532,7 +532,7 @@ class SegmentExpr : public Expr { OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { int64_t processed_size = 0; // index reverse lookup @@ -690,7 +690,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { int64_t processed_size = 0; if constexpr (std::is_same_v || std::is_same_v) { @@ -782,7 +782,7 @@ class SegmentExpr : public Expr { TargetBitmapView res, TargetBitmapView valid_res, bool process_all_chunks, - ValTypes... values) { + const ValTypes&... values) { int64_t processed_size = 0; size_t start_chunk = process_all_chunks ? 0 : current_data_chunk_; @@ -934,7 +934,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { return ProcessMultipleChunksCommon( func, skip_func, res, valid_res, false, values...); } @@ -946,7 +946,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { return ProcessMultipleChunksCommon( func, skip_func, res, valid_res, true, values...); } @@ -961,7 +961,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { if (segment_->is_chunked()) { return ProcessDataChunksForMultipleChunk( func, skip_func, res, valid_res, values...); @@ -978,7 +978,7 @@ class SegmentExpr : public Expr { std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, - ValTypes... values) { + const ValTypes&... values) { if (segment_->is_chunked()) { return ProcessAllChunksForMultipleChunk( func, skip_func, res, valid_res, values...); @@ -1010,7 +1010,7 @@ class SegmentExpr : public Expr { template VectorPtr - ProcessIndexChunks(FUNC func, ValTypes... values) { + ProcessIndexChunks(FUNC func, const ValTypes&... values) { typedef std:: conditional_t, std::string, T> IndexInnerType; @@ -1360,7 +1360,7 @@ class SegmentExpr : public Expr { template void - ProcessIndexChunksV2(FUNC func, ValTypes... values) { + ProcessIndexChunksV2(FUNC func, const ValTypes&... values) { typedef std:: conditional_t, std::string, T> IndexInnerType; diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 52034cf69a..207a3e295c 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -544,11 +544,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); - std::vector elements; - for (auto const& element : expr_->vals_) { - elements.emplace_back(GetValueFromProto(element)); + if (!arg_inited_) { + auto elements = std::make_shared>(); + for (auto const& element : expr_->vals_) { + elements->emplace_back( + GetValueFromProto(element)); + } + arg_cached_set_ = elements; + arg_inited_ = true; } + auto elements = std::static_pointer_cast>( + arg_cached_set_); size_t processed_cursor = 0; auto execute_sub_batch = [&processed_cursor, & @@ -613,14 +620,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { res, valid_res, pointer, - elements); + *elements); } else { processed_size = ProcessDataChunks(execute_sub_batch, std::nullptr_t{}, res, valid_res, pointer, - elements); + *elements); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " @@ -739,11 +746,17 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) { TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - std::set elements; - for (auto const& element : expr_->vals_) { - elements.insert(GetValueWithCastNumber(element)); + if (!arg_inited_) { + auto elements = std::make_shared>(); + for (auto const& element : expr_->vals_) { + elements->insert(GetValueWithCastNumber(element)); + } + arg_cached_set_ = elements; + arg_inited_ = true; } + auto elements = + std::static_pointer_cast>(arg_cached_set_); int processed_cursor = 0; auto execute_sub_batch = [&processed_cursor, & @@ -791,10 +804,10 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) { input, res, valid_res, - elements); + *elements); } else { processed_size = ProcessDataChunks( - execute_sub_batch, std::nullptr_t{}, res, valid_res, elements); + execute_sub_batch, std::nullptr_t{}, res, valid_res, *elements); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " @@ -832,11 +845,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); - std::set elements; - for (auto const& element : expr_->vals_) { - elements.insert(GetValueFromProto(element)); + if (!arg_inited_) { + auto elements = std::make_shared>(); + for (auto const& element : expr_->vals_) { + elements->insert(GetValueFromProto(element)); + } + arg_cached_set_ = elements; + arg_inited_ = true; } + auto elements = + std::static_pointer_cast>(arg_cached_set_); int processed_cursor = 0; auto execute_sub_batch = [&processed_cursor, & @@ -907,14 +926,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) { res, valid_res, pointer, - elements); + *elements); } else { processed_size = ProcessDataChunks(execute_sub_batch, std::nullptr_t{}, res, valid_res, pointer, - elements); + *elements); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " @@ -935,12 +954,19 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { if (real_batch_size == 0) { return nullptr; } - std::set elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); - for (auto const& element : expr_->vals_) { - elements.insert(GetValueFromProto(element)); + if (!arg_inited_) { + auto elements = std::make_shared>(); + for (auto const& element : expr_->vals_) { + elements->insert(GetValueFromProto(element)); + } + arg_cached_set_ = elements; + arg_inited_ = true; } - if (elements.empty()) { + + auto elements = + std::static_pointer_cast>(arg_cached_set_); + if (elements->empty()) { MoveCursor(); return std::make_shared( TargetBitmap(real_batch_size, false), @@ -966,7 +992,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { pointer, milvus::index::JSONType::ARRAY); if (!target_field.empty()) { ShreddingArrayBsonContainsAllExecutor executor( - elements); + *elements); index->ExecutorForShreddingData( op_ctx_, @@ -989,7 +1015,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { return; } - std::set tmp_elements(elements); + std::set tmp_elements(*elements); for (const auto& element : val.value()) { auto value = milvus::BsonView::GetValueFromBsonView( element.get_value()); diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index f87db03624..09b844880b 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -554,6 +554,8 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { bool arg_inited_{false}; std::shared_ptr arg_set_; std::shared_ptr arg_set_double_; + std::shared_ptr + arg_cached_set_; // For caching std::set or std::vector PinWrapper pinned_json_stats_{nullptr}; }; } //namespace exec diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 7a083ff595..a51244b891 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -878,16 +878,21 @@ PhyTermFilterExpr::ExecVisitorImplForIndex() { return nullptr; } - std::vector vals; - for (auto& val : expr_->vals_) { - vals.emplace_back(GetValueFromProto(val) ? 1 : 0); + if (!arg_inited_) { + std::vector vals; + for (auto& val : expr_->vals_) { + vals.emplace_back(GetValueFromProto(val) ? 1 : 0); + } + arg_set_ = std::make_shared>(vals); + arg_inited_ = true; } auto execute_sub_batch = [](Index* index_ptr, const std::vector& vals) { TermIndexFunc func; return std::move(func(index_ptr, vals.size(), (bool*)vals.data())); }; - auto res = ProcessIndexChunks(execute_sub_batch, vals); + auto args = std::dynamic_pointer_cast>(arg_set_); + auto res = ProcessIndexChunks(execute_sub_batch, args->values_); return res; }