From 6c55db44f1689aa37208d9d88cf19b55aa9a8e5c Mon Sep 17 00:00:00 2001 From: zhagnlu <1542303831@qq.com> Date: Wed, 19 Mar 2025 14:50:14 +0800 Subject: [PATCH] enhance: reorder sub expr for conjunct expr (#39872) two point: (1) reoder conjucts expr's subexpr, postpone heavy operations sequence: int(column) -> index(column) -> string(column) -> light conjuct ...... -> json(column) -> heavy conjuct -> two_column_compare (2) support pre filter for expr execute, skip scan raw data that had been skipped because of preceding expr result. #39869 Signed-off-by: luzhang Co-authored-by: luzhang --- configs/milvus.yaml | 1 + internal/core/src/common/Common.cpp | 7 + internal/core/src/common/Common.h | 4 + internal/core/src/common/Consts.h | 3 +- internal/core/src/common/Types.h | 5 + internal/core/src/common/init_c.cpp | 10 +- internal/core/src/common/init_c.h | 3 + .../src/exec/expression/AlwaysTrueExpr.cpp | 5 +- .../core/src/exec/expression/AlwaysTrueExpr.h | 15 + .../expression/BinaryArithOpEvalRangeExpr.cpp | 18 +- .../expression/BinaryArithOpEvalRangeExpr.h | 15 + .../src/exec/expression/BinaryRangeExpr.cpp | 141 +++++--- .../src/exec/expression/BinaryRangeExpr.h | 45 ++- internal/core/src/exec/expression/CallExpr.h | 15 + .../core/src/exec/expression/ColumnExpr.h | 15 + .../core/src/exec/expression/CompareExpr.cpp | 137 +++++--- .../core/src/exec/expression/CompareExpr.h | 55 ++- .../core/src/exec/expression/ConjunctExpr.cpp | 17 +- .../core/src/exec/expression/ConjunctExpr.h | 61 +++- internal/core/src/exec/expression/EvalCtx.h | 18 + .../core/src/exec/expression/ExistsExpr.cpp | 22 +- .../core/src/exec/expression/ExistsExpr.h | 17 +- internal/core/src/exec/expression/Expr.cpp | 171 +++++++++- internal/core/src/exec/expression/Expr.h | 32 +- .../src/exec/expression/JsonContainsExpr.cpp | 215 ++++++++---- .../src/exec/expression/JsonContainsExpr.h | 33 +- .../src/exec/expression/LogicalBinaryExpr.h | 15 + .../src/exec/expression/LogicalUnaryExpr.h | 15 + internal/core/src/exec/expression/NullExpr.h | 15 + .../core/src/exec/expression/TermExpr.cpp | 168 ++++++---- internal/core/src/exec/expression/TermExpr.h | 33 +- .../core/src/exec/expression/UnaryExpr.cpp | 281 +++++++++++----- internal/core/src/exec/expression/UnaryExpr.h | 59 +++- internal/core/src/exec/expression/ValueExpr.h | 15 + .../core/src/monitor/prometheus_client.cpp | 6 +- internal/core/src/monitor/prometheus_client.h | 1 + internal/core/unittest/test_exec.cpp | 312 ++++++++++++++++-- internal/core/unittest/test_expr.cpp | 139 ++++++++ internal/querynodev2/server.go | 3 + pkg/util/paramtable/component_param.go | 11 + 40 files changed, 1765 insertions(+), 388 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0f893b2fd6..76042ab3d7 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -889,6 +889,7 @@ common: localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode. sync: taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool + enabledOptimizeExpr: true # Indicates whether to enable optimize expr # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/core/src/common/Common.cpp b/internal/core/src/common/Common.cpp index b51c86e374..cd27d469fd 100644 --- a/internal/core/src/common/Common.cpp +++ b/internal/core/src/common/Common.cpp @@ -28,6 +28,7 @@ int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT = DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT; int CPU_NUM = DEFAULT_CPU_NUM; int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE; +bool OPTIMIZE_EXPR_ENABLED = DEFAULT_OPTIMIZE_EXPR_ENABLED; void SetIndexSliceSize(const int64_t size) { @@ -67,4 +68,10 @@ SetCpuNum(const int num) { CPU_NUM = num; } +void +SetDefaultOptimizeExprEnable(bool val) { + OPTIMIZE_EXPR_ENABLED = val; + LOG_INFO("set default optimize expr enabled: {}", OPTIMIZE_EXPR_ENABLED); +} + } // namespace milvus diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index a691cf2f03..a4245d4c5e 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -29,6 +29,7 @@ extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT; extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT; extern int CPU_NUM; extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE; +extern bool OPTIMIZE_EXPR_ENABLED; void SetIndexSliceSize(const int64_t size); @@ -48,6 +49,9 @@ SetCpuNum(const int core); void SetDefaultExecEvalExprBatchSize(int64_t val); +void +SetDefaultOptimizeExprEnable(bool val); + struct BufferView { struct Element { const char* data_; diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 8c76f5cb23..f51bd01c93 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -79,4 +79,5 @@ const int64_t DEFAULT_HYBRID_INDEX_BITMAP_CARDINALITY_LIMIT = 100; const size_t MARISA_NULL_KEY_ID = -1; const std::string JSON_CAST_TYPE = "json_cast_type"; -const std::string JSON_PATH = "json_path"; \ No newline at end of file +const std::string JSON_PATH = "json_path"; +const bool DEFAULT_OPTIMIZE_EXPR_ENABLED = true; diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index b8358e8d8f..9ee8ddc953 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -254,6 +254,11 @@ IsFloatDataType(DataType data_type) { } } +inline bool +IsNumericDataType(DataType data_type) { + return IsIntegerDataType(data_type) || IsFloatDataType(data_type); +} + inline bool IsStringDataType(DataType data_type) { switch (data_type) { diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index 77764ffa55..25ec3dedd1 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -25,7 +25,7 @@ #include "common/Tracer.h" #include "log/Log.h" -std::once_flag flag1, flag2, flag3, flag4, flag5, flag6; +std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7; std::once_flag traceFlag; void @@ -78,6 +78,14 @@ InitDefaultExprEvalBatchSize(int64_t val) { val); } +void +InitDefaultOptimizeExprEnable(bool val) { + std::call_once( + flag7, + [](bool val) { milvus::SetDefaultOptimizeExprEnable(val); }, + val); +} + void InitTrace(CTraceConfig* config) { auto traceConfig = milvus::tracer::TraceConfig{config->exporter, diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index b477b12789..d6d117a8fd 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -48,6 +48,9 @@ InitTrace(CTraceConfig* config); void SetTrace(CTraceConfig* config); +void +InitDefaultOptimizeExprEnable(bool val); + #ifdef __cplusplus }; #endif diff --git a/internal/core/src/exec/expression/AlwaysTrueExpr.cpp b/internal/core/src/exec/expression/AlwaysTrueExpr.cpp index 063515cc19..1a0ebcf08f 100644 --- a/internal/core/src/exec/expression/AlwaysTrueExpr.cpp +++ b/internal/core/src/exec/expression/AlwaysTrueExpr.cpp @@ -35,8 +35,9 @@ PhyAlwaysTrueExpr::Eval(EvalCtx& context, VectorPtr& result) { return; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); diff --git a/internal/core/src/exec/expression/AlwaysTrueExpr.h b/internal/core/src/exec/expression/AlwaysTrueExpr.h index 5ef0dc10d4..aedd0e04fd 100644 --- a/internal/core/src/exec/expression/AlwaysTrueExpr.h +++ b/internal/core/src/exec/expression/AlwaysTrueExpr.h @@ -57,6 +57,21 @@ class PhyAlwaysTrueExpr : public Expr { } } + std::string + ToString() const override { + return "[AlwaysTrue]"; + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: std::shared_ptr expr_; int64_t active_count_; diff --git a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp index 6fab3aaf0c..d3d72d39ac 100644 --- a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.cpp @@ -117,11 +117,11 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForJson( if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); if (!arg_inited_) { value_arg_.SetValue(expr_->value_); @@ -535,11 +535,11 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForArray( if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); int index = -1; if (expr_->column_.nested_path_.size() > 0) { @@ -1435,11 +1435,11 @@ PhyBinaryArithOpEvalRangeExpr::ExecRangeVisitorImplForData( return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); if (!arg_inited_) { value_arg_.SetValue(expr_->value_); diff --git a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h index 114d759661..ac2dad4923 100644 --- a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h @@ -464,6 +464,21 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::string + ToString() const override { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: template VectorPtr diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index b5453965c6..0c7c6aa405 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -28,31 +28,31 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { SetHasOffsetInput((input != nullptr)); switch (expr_->column_.data_type_) { case DataType::BOOL: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT8: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT16: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT32: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT64: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::FLOAT: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::DOUBLE: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::VARCHAR: { @@ -60,9 +60,9 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { !storage::MmapManager::GetInstance() .GetMmapConfig() .growing_enable_mmap) { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); } else { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); } break; } @@ -70,15 +70,15 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { auto value_type = expr_->lower_val_.val_case(); switch (value_type) { case proto::plan::GenericValue::ValCase::kInt64Val: { - result = ExecRangeVisitorImplForJson(input); + result = ExecRangeVisitorImplForJson(context); break; } case proto::plan::GenericValue::ValCase::kFloatVal: { - result = ExecRangeVisitorImplForJson(input); + result = ExecRangeVisitorImplForJson(context); break; } case proto::plan::GenericValue::ValCase::kStringVal: { - result = ExecRangeVisitorImplForJson(input); + result = ExecRangeVisitorImplForJson(context); break; } default: { @@ -95,17 +95,17 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { switch (value_type) { case proto::plan::GenericValue::ValCase::kInt64Val: { SetNotUseIndex(); - result = ExecRangeVisitorImplForArray(input); + result = ExecRangeVisitorImplForArray(context); break; } case proto::plan::GenericValue::ValCase::kFloatVal: { SetNotUseIndex(); - result = ExecRangeVisitorImplForArray(input); + result = ExecRangeVisitorImplForArray(context); break; } case proto::plan::GenericValue::ValCase::kStringVal: { SetNotUseIndex(); - result = ExecRangeVisitorImplForArray(input); + result = ExecRangeVisitorImplForArray(context); break; } default: { @@ -126,11 +126,11 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { template VectorPtr -PhyBinaryRangeFilterExpr::ExecRangeVisitorImpl(OffsetVector* input) { +PhyBinaryRangeFilterExpr::ExecRangeVisitorImpl(EvalCtx& context) { if (is_index_mode_ && !has_offset_input_) { return ExecRangeVisitorImplForIndex(); } else { - return ExecRangeVisitorImplForData(input); + return ExecRangeVisitorImplForData(context); } } @@ -235,7 +235,7 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForIndex() { template VectorPtr -PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { +PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { typedef std:: conditional_t, std::string, T> IndexInnerType; @@ -246,6 +246,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { IndexInnerType> HighPrecisionType; + const auto& bitmap_input = context.get_bitmap_input(); + auto* input = context.get_offset_input(); HighPrecisionType val1; HighPrecisionType val2; bool lower_inclusive = false; @@ -260,15 +262,16 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); + size_t processed_cursor = 0; auto execute_sub_batch = - [ lower_inclusive, - upper_inclusive ]( + [ lower_inclusive, upper_inclusive, &processed_cursor, & + bitmap_input ]( const T* data, const bool* valid_data, const int32_t* offsets, @@ -279,16 +282,44 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { HighPrecisionType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; - func(val1, val2, data, size, res, offsets); + func(val1, + val2, + data, + size, + res, + bitmap_input, + processed_cursor, + offsets); } else if (lower_inclusive && !upper_inclusive) { BinaryRangeElementFunc func; - func(val1, val2, data, size, res, offsets); + func(val1, + val2, + data, + size, + res, + bitmap_input, + processed_cursor, + offsets); } else if (!lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; - func(val1, val2, data, size, res, offsets); + func(val1, + val2, + data, + size, + res, + bitmap_input, + processed_cursor, + offsets); } else { BinaryRangeElementFunc func; - func(val1, val2, data, size, res, offsets); + func(val1, + val2, + data, + size, + res, + bitmap_input, + processed_cursor, + offsets); } // there is a batch operation in BinaryRangeElementFunc, // so not divide data again for the reason that it may reduce performance if the null distribution is scattered @@ -304,6 +335,7 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { } } } + processed_cursor += size; }; auto skip_index_func = @@ -346,20 +378,23 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { template VectorPtr -PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { +PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + const auto& bitmap_input = context.get_bitmap_input(); + auto* input = context.get_offset_input(); + FieldId field_id = expr_->column_.field_id_; auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); bool lower_inclusive = expr_->lower_inclusive_; bool upper_inclusive = expr_->upper_inclusive_; @@ -372,9 +407,15 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { ValueType val2 = upper_arg_.GetValue(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + size_t processed_cursor = 0; auto execute_sub_batch = - [ lower_inclusive, upper_inclusive, - pointer ]( + [ + lower_inclusive, + upper_inclusive, + pointer, + &bitmap_input, + &processed_cursor + ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -394,6 +435,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else if (lower_inclusive && !upper_inclusive) { BinaryRangeElementFuncForJson @@ -406,6 +449,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else if (!lower_inclusive && upper_inclusive) { @@ -419,6 +464,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else { BinaryRangeElementFuncForJson @@ -431,8 +478,11 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { @@ -457,20 +507,22 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { template VectorPtr -PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { +PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + const auto& bitmap_input = context.get_bitmap_input(); + auto* input = context.get_offset_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); bool lower_inclusive = expr_->lower_inclusive_; bool upper_inclusive = expr_->upper_inclusive_; @@ -488,9 +540,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { index = std::stoi(expr_->column_.nested_path_[0]); } + size_t processed_cursor = 0; auto execute_sub_batch = - [ lower_inclusive, - upper_inclusive ]( + [ lower_inclusive, upper_inclusive, &processed_cursor, & + bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -511,6 +564,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else if (lower_inclusive && !upper_inclusive) { BinaryRangeElementFuncForArray @@ -523,6 +578,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else if (!lower_inclusive && upper_inclusive) { @@ -536,6 +593,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } else { @@ -549,9 +608,13 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { size, res, valid_res, + bitmap_input, + processed_cursor, offsets); } + processed_cursor += size; }; + int64_t processed_size; if (has_offset_input_) { processed_size = diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.h b/internal/core/src/exec/expression/BinaryRangeExpr.h index a98efd58c9..386b0cd626 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryRangeExpr.h @@ -44,9 +44,17 @@ struct BinaryRangeElementFunc { const T* src, size_t n, TargetBitmapView res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { - if constexpr (filter_type == FilterType::random) { + if constexpr (filter_type == FilterType::random || + std::is_same_v || + std::is_same_v) { + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < n; ++i) { + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { + continue; + } auto offset = (offsets) ? offsets[i] : i; if constexpr (lower_inclusive && upper_inclusive) { res[i] = val1 <= src[offset] && src[offset] <= val2; @@ -83,6 +91,9 @@ struct BinaryRangeElementFunc { res[i] = valid_res[i] = false; \ break; \ } \ + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { \ + break; \ + } \ auto x = src[offset].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ @@ -117,7 +128,10 @@ struct BinaryRangeElementFuncForJson { size_t n, TargetBitmapView res, TargetBitmapView valid_res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < n; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -153,8 +167,14 @@ struct BinaryRangeElementFuncForArray { size_t n, TargetBitmapView res, TargetBitmapView valid_res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < n; ++i) { + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { + continue; + } size_t offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; @@ -240,6 +260,21 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: // Check overflow and cache result for performace template < @@ -259,7 +294,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { template VectorPtr - ExecRangeVisitorImpl(OffsetVector* input = nullptr); + ExecRangeVisitorImpl(EvalCtx& context); template VectorPtr @@ -267,15 +302,15 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { template VectorPtr - ExecRangeVisitorImplForData(OffsetVector* input = nullptr); + ExecRangeVisitorImplForData(EvalCtx& context); template VectorPtr - ExecRangeVisitorImplForJson(OffsetVector* input = nullptr); + ExecRangeVisitorImplForJson(EvalCtx& context); template VectorPtr - ExecRangeVisitorImplForArray(OffsetVector* input = nullptr); + ExecRangeVisitorImplForArray(EvalCtx& context); private: std::shared_ptr expr_; diff --git a/internal/core/src/exec/expression/CallExpr.h b/internal/core/src/exec/expression/CallExpr.h index c4a690cbc0..885c083491 100644 --- a/internal/core/src/exec/expression/CallExpr.h +++ b/internal/core/src/exec/expression/CallExpr.h @@ -68,6 +68,21 @@ class PhyCallExpr : public Expr { } } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return false; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: std::shared_ptr expr_; diff --git a/internal/core/src/exec/expression/ColumnExpr.h b/internal/core/src/exec/expression/ColumnExpr.h index 5564de7da8..d16d0d236f 100644 --- a/internal/core/src/exec/expression/ColumnExpr.h +++ b/internal/core/src/exec/expression/ColumnExpr.h @@ -114,6 +114,21 @@ class PhyColumnExpr : public Expr { VectorPtr DoEval(OffsetVector* input = nullptr); + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->GetColumn(); + } + private: bool is_indexed_; diff --git a/internal/core/src/exec/expression/CompareExpr.cpp b/internal/core/src/exec/expression/CompareExpr.cpp index cb7994fbea..01e27699fe 100644 --- a/internal/core/src/exec/expression/CompareExpr.cpp +++ b/internal/core/src/exec/expression/CompareExpr.cpp @@ -38,20 +38,20 @@ PhyCompareFilterExpr::GetNextBatchSize() { template VectorPtr -PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op, - OffsetVector* input) { +PhyCompareFilterExpr::ExecCompareExprDispatcher(OpType op, EvalCtx& context) { // take offsets as input + auto input = context.get_offset_input(); if (has_offset_input_) { auto real_batch_size = input->size(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto left_data_barrier = segment_chunk_reader_.segment_->num_chunk_data( expr_->left_field_id_); @@ -215,37 +215,37 @@ PhyCompareFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { // For segment both fields has no index, can use SIMD to speed up. // Avoiding too much call stack that blocks SIMD. if (!is_left_indexed_ && !is_right_indexed_ && !IsStringExpr()) { - result = ExecCompareExprDispatcherForBothDataSegment(input); + result = ExecCompareExprDispatcherForBothDataSegment(context); return; } - result = ExecCompareExprDispatcherForHybridSegment(input); + result = ExecCompareExprDispatcherForHybridSegment(context); } VectorPtr PhyCompareFilterExpr::ExecCompareExprDispatcherForHybridSegment( - OffsetVector* input) { + EvalCtx& context) { switch (expr_->op_type_) { case OpType::Equal: { - return ExecCompareExprDispatcher(std::equal_to<>{}, input); + return ExecCompareExprDispatcher(std::equal_to<>{}, context); } case OpType::NotEqual: { - return ExecCompareExprDispatcher(std::not_equal_to<>{}, input); + return ExecCompareExprDispatcher(std::not_equal_to<>{}, context); } case OpType::GreaterEqual: { - return ExecCompareExprDispatcher(std::greater_equal<>{}, input); + return ExecCompareExprDispatcher(std::greater_equal<>{}, context); } case OpType::GreaterThan: { - return ExecCompareExprDispatcher(std::greater<>{}, input); + return ExecCompareExprDispatcher(std::greater<>{}, context); } case OpType::LessEqual: { - return ExecCompareExprDispatcher(std::less_equal<>{}, input); + return ExecCompareExprDispatcher(std::less_equal<>{}, context); } case OpType::LessThan: { - return ExecCompareExprDispatcher(std::less<>{}, input); + return ExecCompareExprDispatcher(std::less<>{}, context); } case OpType::PrefixMatch: { return ExecCompareExprDispatcher( - milvus::query::MatchOp{}, input); + milvus::query::MatchOp{}, context); } // case OpType::PostfixMatch: { // } @@ -257,22 +257,22 @@ PhyCompareFilterExpr::ExecCompareExprDispatcherForHybridSegment( VectorPtr PhyCompareFilterExpr::ExecCompareExprDispatcherForBothDataSegment( - OffsetVector* input) { + EvalCtx& context) { switch (expr_->left_data_type_) { case DataType::BOOL: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::INT8: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::INT16: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::INT32: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::INT64: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::FLOAT: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); case DataType::DOUBLE: - return ExecCompareLeftType(input); + return ExecCompareLeftType(context); default: PanicInfo( DataTypeInvalid, @@ -283,22 +283,22 @@ PhyCompareFilterExpr::ExecCompareExprDispatcherForBothDataSegment( template VectorPtr -PhyCompareFilterExpr::ExecCompareLeftType(OffsetVector* input) { +PhyCompareFilterExpr::ExecCompareLeftType(EvalCtx& context) { switch (expr_->right_data_type_) { case DataType::BOOL: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::INT8: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::INT16: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::INT32: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::INT64: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::FLOAT: - return ExecCompareRightType(input); + return ExecCompareRightType(context); case DataType::DOUBLE: - return ExecCompareRightType(input); + return ExecCompareRightType(context); default: PanicInfo( DataTypeInvalid, @@ -309,61 +309,101 @@ PhyCompareFilterExpr::ExecCompareLeftType(OffsetVector* input) { template VectorPtr -PhyCompareFilterExpr::ExecCompareRightType(OffsetVector* input) { +PhyCompareFilterExpr::ExecCompareRightType(EvalCtx& context) { + auto input = context.get_offset_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + const auto& bitmap_input = context.get_bitmap_input(); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto expr_type = expr_->op_type_; - auto execute_sub_batch = [expr_type]( - const T* left, - const U* right, - const int32_t* offsets, - const int size, - TargetBitmapView res) { + size_t processed_cursor = 0; + auto execute_sub_batch = + [ expr_type, &bitmap_input, & + processed_cursor ]( + const T* left, + const U* right, + const int32_t* offsets, + const int size, + TargetBitmapView res) { switch (expr_type) { case proto::plan::GreaterThan: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::GreaterEqual: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::LessThan: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::LessEqual: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::Equal: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::NotEqual: { CompareElementFunc func; - func(left, right, size, res, offsets); + func(left, + right, + size, + res, + bitmap_input, + processed_cursor, + offsets); break; } default: @@ -372,6 +412,7 @@ PhyCompareFilterExpr::ExecCompareRightType(OffsetVector* input) { "compare column expr: {}", expr_type)); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { diff --git a/internal/core/src/exec/expression/CompareExpr.h b/internal/core/src/exec/expression/CompareExpr.h index 7ab132b233..5ad51a8e3b 100644 --- a/internal/core/src/exec/expression/CompareExpr.h +++ b/internal/core/src/exec/expression/CompareExpr.h @@ -40,6 +40,8 @@ struct CompareElementFunc { const U* right, size_t size, TargetBitmapView res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { // This is the original code, kept here for the documentation purposes // also, used for iterative filter @@ -69,6 +71,34 @@ struct CompareElementFunc { return; } + if (!bitmap_input.empty()) { + for (int i = 0; i < size; ++i) { + if (!bitmap_input[start_cursor + i]) { + continue; + } + if constexpr (op == proto::plan::OpType::Equal) { + res[i] = left[i] == right[i]; + } else if constexpr (op == proto::plan::OpType::NotEqual) { + res[i] = left[i] != right[i]; + } else if constexpr (op == proto::plan::OpType::GreaterThan) { + res[i] = left[i] > right[i]; + } else if constexpr (op == proto::plan::OpType::LessThan) { + res[i] = left[i] < right[i]; + } else if constexpr (op == proto::plan::OpType::GreaterEqual) { + res[i] = left[i] >= right[i]; + } else if constexpr (op == proto::plan::OpType::LessEqual) { + res[i] = left[i] <= right[i]; + } else { + PanicInfo( + OpTypeInvalid, + fmt::format( + "unsupported op_type:{} for CompareElementFunc", + op)); + } + } + return; + } + if constexpr (op == proto::plan::OpType::Equal) { res.inplace_compare_column( left, right, size); @@ -170,6 +200,21 @@ class PhyCompareFilterExpr : public Expr { } } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: int64_t GetCurrentRows() { @@ -451,21 +496,21 @@ class PhyCompareFilterExpr : public Expr { template VectorPtr - ExecCompareExprDispatcher(OpType op, OffsetVector* input = nullptr); + ExecCompareExprDispatcher(OpType op, EvalCtx& context); VectorPtr - ExecCompareExprDispatcherForHybridSegment(OffsetVector* input = nullptr); + ExecCompareExprDispatcherForHybridSegment(EvalCtx& context); VectorPtr - ExecCompareExprDispatcherForBothDataSegment(OffsetVector* input = nullptr); + ExecCompareExprDispatcherForBothDataSegment(EvalCtx& context); template VectorPtr - ExecCompareLeftType(OffsetVector* input = nullptr); + ExecCompareLeftType(EvalCtx& context); template VectorPtr - ExecCompareRightType(OffsetVector* input = nullptr); + ExecCompareRightType(EvalCtx& context); private: const FieldId left_field_; diff --git a/internal/core/src/exec/expression/ConjunctExpr.cpp b/internal/core/src/exec/expression/ConjunctExpr.cpp index da535d936d..50435eefa9 100644 --- a/internal/core/src/exec/expression/ConjunctExpr.cpp +++ b/internal/core/src/exec/expression/ConjunctExpr.cpp @@ -83,16 +83,22 @@ PhyConjunctFilterExpr::CanSkipFollowingExprs(ColumnVectorPtr& vec) { void PhyConjunctFilterExpr::SkipFollowingExprs(int start) { - for (int i = start; i < inputs_.size(); ++i) { - inputs_[i]->MoveCursor(); + for (int i = start; i < input_order_.size(); ++i) { + inputs_[input_order_[i]]->MoveCursor(); } } void PhyConjunctFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { - for (int i = 0; i < inputs_.size(); ++i) { + if (input_order_.empty()) { + input_order_.resize(inputs_.size()); + for (size_t i = 0; i < inputs_.size(); i++) { + input_order_[i] = i; + } + } + for (int i = 0; i < input_order_.size(); ++i) { VectorPtr input_result; - inputs_[i]->Eval(context, input_result); + inputs_[input_order_[i]]->Eval(context, input_result); if (i == 0) { result = input_result; auto all_flat_result = GetColumnVector(result); @@ -100,6 +106,7 @@ PhyConjunctFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { SkipFollowingExprs(i + 1); return; } + SetNextExprBitmapInput(all_flat_result, context); continue; } auto input_flat_result = GetColumnVector(input_result); @@ -110,7 +117,9 @@ PhyConjunctFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { SkipFollowingExprs(i + 1); return; } + SetNextExprBitmapInput(all_flat_result, context); } + ClearBitmapInput(context); } } //namespace exec diff --git a/internal/core/src/exec/expression/ConjunctExpr.h b/internal/core/src/exec/expression/ConjunctExpr.h index a9de859bd4..31afa80483 100644 --- a/internal/core/src/exec/expression/ConjunctExpr.h +++ b/internal/core/src/exec/expression/ConjunctExpr.h @@ -66,7 +66,7 @@ struct ConjunctElementFunc { class PhyConjunctFilterExpr : public Expr { public: PhyConjunctFilterExpr(std::vector&& inputs, bool is_and) - : Expr(DataType::BOOL, std::move(inputs), is_and ? "and" : "or"), + : Expr(DataType::BOOL, std::move(inputs), "PhyConjunctFilterExpr"), is_and_(is_and) { std::vector input_types; input_types.reserve(inputs_.size()); @@ -101,6 +101,63 @@ class PhyConjunctFilterExpr : public Expr { return true; } + std::string + ToString() const { + if (!input_order_.empty()) { + std::vector inputs; + for (auto& i : input_order_) { + inputs.push_back(inputs_[i]->ToString()); + } + std::string input_str = + is_and_ ? Join(inputs, " && ") : Join(inputs, " || "); + return fmt::format("[ConjuctExpr:{}]", input_str); + } + std::vector inputs; + for (auto& in : inputs_) { + inputs.push_back(in->ToString()); + } + std::string input_str = + is_and_ ? Join(inputs, " && ") : Join(inputs, "||"); + return fmt::format("[ConjuctExpr:{}]", input_str); + } + + bool + IsSource() const override { + return false; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + + void + Reorder(const std::vector& exprs_order) { + input_order_ = exprs_order; + } + + std::vector + GetReorder() { + return input_order_; + } + + void + SetNextExprBitmapInput(const ColumnVectorPtr& vec, EvalCtx& context) { + TargetBitmapView last_res_bitmap(vec->GetRawData(), vec->size()); + TargetBitmap next_input_bitmap(last_res_bitmap); + if (is_and_) { + context.set_bitmap_input(std::move(next_input_bitmap)); + } else { + next_input_bitmap.flip(); + context.set_bitmap_input(std::move(next_input_bitmap)); + } + } + + void + ClearBitmapInput(EvalCtx& context) { + context.clear_bitmap_input(); + } + private: int64_t UpdateResult(ColumnVectorPtr& input_result, @@ -117,7 +174,7 @@ class PhyConjunctFilterExpr : public Expr { SkipFollowingExprs(int start); // true if conjunction (and), false if disjunction (or). bool is_and_; - std::vector input_order_; + std::vector input_order_; }; } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/EvalCtx.h b/internal/core/src/exec/expression/EvalCtx.h index 185c30c96d..f1a5c880c6 100644 --- a/internal/core/src/exec/expression/EvalCtx.h +++ b/internal/core/src/exec/expression/EvalCtx.h @@ -69,12 +69,30 @@ class EvalCtx { offset_input_ = offset_input; } + inline void + set_bitmap_input(TargetBitmap&& bitmap_input) { + bitmap_input_ = std::move(bitmap_input); + } + + inline const TargetBitmap& + get_bitmap_input() const { + return bitmap_input_; + } + + void + clear_bitmap_input() { + bitmap_input_.clear(); + } + private: ExecContext* exec_ctx_ = nullptr; ExprSet* expr_set_ = nullptr; // we may accept offsets array as input and do expr filtering on these data OffsetVector* offset_input_ = nullptr; bool input_no_nulls_ = false; + + // used for expr pre filter, that avoid unnecessary execution on filtered data + TargetBitmap bitmap_input_; }; } // namespace exec diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index a4163e46aa..a7c5eb9440 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -30,7 +30,7 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { PanicInfo(ExprInvalid, "exists expr for json index mode not supported"); } - result = EvalJsonExistsForDataSegment(input); + result = EvalJsonExistsForDataSegment(context); break; } default: @@ -41,21 +41,26 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } VectorPtr -PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { +PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + FieldId field_id = expr_->column_.field_id_; auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + int processed_cursor = 0; auto execute_sub_batch = - []( + [&bitmap_input, & + processed_cursor ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -63,6 +68,7 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::string& pointer) { + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -72,8 +78,12 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = data[offset].exist(pointer); } + processed_cursor += size; }; int64_t processed_size; diff --git a/internal/core/src/exec/expression/ExistsExpr.h b/internal/core/src/exec/expression/ExistsExpr.h index 4db541d2cc..4fbe49f35e 100644 --- a/internal/core/src/exec/expression/ExistsExpr.h +++ b/internal/core/src/exec/expression/ExistsExpr.h @@ -57,9 +57,24 @@ class PhyExistsFilterExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: VectorPtr - EvalJsonExistsForDataSegment(OffsetVector* input = nullptr); + EvalJsonExistsForDataSegment(EvalCtx& context); private: std::shared_ptr expr_; diff --git a/internal/core/src/exec/expression/Expr.cpp b/internal/core/src/exec/expression/Expr.cpp index 6f89803ddd..28fa0839ba 100644 --- a/internal/core/src/exec/expression/Expr.cpp +++ b/internal/core/src/exec/expression/Expr.cpp @@ -67,7 +67,9 @@ CompileExpressions(const std::vector& sources, enable_constant_folding)); } - OptimizeCompiledExprs(context, exprs); + if (OPTIMIZE_EXPR_ENABLED) { + OptimizeCompiledExprs(context, exprs); + } return exprs; } @@ -303,9 +305,174 @@ CompileExpression(const expr::TypedExprPtr& expr, return result; } +bool +IsLikeExpr(std::shared_ptr input) { + if (input->name() == "PhyUnaryRangeFilterExpr") { + auto optype = std::static_pointer_cast(input) + ->GetLogicalExpr() + ->op_type_; + switch (optype) { + case proto::plan::PrefixMatch: + case proto::plan::PostfixMatch: + case proto::plan::Match: + return true; + default: + return false; + } + } + return false; +} + +inline void +ReorderConjunctExpr(std::shared_ptr& expr, + ExecContext* context, + bool& has_heavy_operation) { + auto* segment = context->get_query_context()->get_segment(); + if (!segment || !expr) { + return; + } + std::vector reorder; + std::vector numeric_expr; + std::vector indexed_expr; + std::vector string_expr; + std::vector str_like_expr; + std::vector json_expr; + std::vector json_like_expr; + std::vector array_expr; + std::vector array_like_expr; + std::vector compare_expr; + std::vector other_expr; + std::vector heavy_conjunct_expr; + std::vector light_conjunct_expr; + + const auto& inputs = expr->GetInputsRef(); + for (int i = 0; i < inputs.size(); i++) { + auto input = inputs[i]; + + if (input->IsSource() && input->GetColumnInfo().has_value()) { + auto column = input->GetColumnInfo().value(); + if (IsNumericDataType(column.data_type_)) { + numeric_expr.push_back(i); + continue; + } + if (segment->HasIndex(column.field_id_)) { + indexed_expr.push_back(i); + continue; + } + + if (IsStringDataType(column.data_type_)) { + auto is_like_expr = IsLikeExpr(input); + if (is_like_expr) { + str_like_expr.push_back(i); + has_heavy_operation = true; + } else { + string_expr.push_back(i); + } + continue; + } + + if (IsArrayDataType(column.data_type_)) { + auto is_like_expr = IsLikeExpr(input); + if (is_like_expr) { + array_like_expr.push_back(i); + has_heavy_operation = true; + } else { + array_expr.push_back(i); + } + continue; + } + + if (IsJsonDataType(column.data_type_)) { + auto is_like_expr = IsLikeExpr(input); + if (is_like_expr) { + json_like_expr.push_back(i); + } else { + json_expr.push_back(i); + } + has_heavy_operation = true; + continue; + } + } + + if (input->name() == "PhyConjunctFilterExpr") { + bool sub_expr_heavy = false; + auto expr = std::static_pointer_cast(input); + ReorderConjunctExpr(expr, context, sub_expr_heavy); + has_heavy_operation |= sub_expr_heavy; + if (sub_expr_heavy) { + heavy_conjunct_expr.push_back(i); + } else { + light_conjunct_expr.push_back(i); + } + continue; + } + + if (input->name() == "PhyCompareFilterExpr") { + compare_expr.push_back(i); + has_heavy_operation = true; + continue; + } + + other_expr.push_back(i); + } + + reorder.reserve(inputs.size()); + // Final reorder sequence: + // 1. Numeric column expressions (fastest to evaluate) + // 2. Indexed column expressions (can use index for efficient filtering) + // 3. String column expressions + // 4. Light conjunct expressions (conjunctions without heavy operations) + // 5. Other expressions + // 6. Array column expression + // 7. String like expression + // 8. Array like expression + // 9. JSON column expressions (expensive to evaluate) + // 10. JSON like expression (more expensive than common json compare) + // 11. Heavy conjunct expressions (conjunctions with heavy operations) + // 12. Compare filter expressions (most expensive, comparing two columns) + reorder.insert(reorder.end(), numeric_expr.begin(), numeric_expr.end()); + reorder.insert(reorder.end(), indexed_expr.begin(), indexed_expr.end()); + reorder.insert(reorder.end(), string_expr.begin(), string_expr.end()); + reorder.insert( + reorder.end(), light_conjunct_expr.begin(), light_conjunct_expr.end()); + reorder.insert(reorder.end(), other_expr.begin(), other_expr.end()); + reorder.insert(reorder.end(), array_expr.begin(), array_expr.end()); + reorder.insert(reorder.end(), str_like_expr.begin(), str_like_expr.end()); + reorder.insert( + reorder.end(), array_like_expr.begin(), array_like_expr.end()); + reorder.insert(reorder.end(), json_expr.begin(), json_expr.end()); + reorder.insert(reorder.end(), json_like_expr.begin(), json_like_expr.end()); + reorder.insert( + reorder.end(), heavy_conjunct_expr.begin(), heavy_conjunct_expr.end()); + reorder.insert(reorder.end(), compare_expr.begin(), compare_expr.end()); + + AssertInfo(reorder.size() == inputs.size(), + "reorder size:{} but input size:{}", + reorder.size(), + inputs.size()); + + expr->Reorder(reorder); +} + inline void OptimizeCompiledExprs(ExecContext* context, const std::vector& exprs) { - //TODO: add optimization pattern + std::chrono::high_resolution_clock::time_point start = + std::chrono::high_resolution_clock::now(); + for (const auto& expr : exprs) { + if (expr->name() == "PhyConjunctFilterExpr") { + LOG_DEBUG("before reoder filter expression: {}", expr->ToString()); + auto conjunct_expr = + std::static_pointer_cast(expr); + bool has_heavy_operation = false; + ReorderConjunctExpr(conjunct_expr, context, has_heavy_operation); + LOG_DEBUG("after reorder filter expression: {}", expr->ToString()); + } + } + std::chrono::high_resolution_clock::time_point end = + std::chrono::high_resolution_clock::now(); + double cost = + std::chrono::duration(end - start).count(); + monitor::internal_core_optimize_expr_latency.Observe(cost / 1000); } } // namespace exec diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index bf564c1d64..0068d17385 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -64,7 +64,7 @@ class Expr { } std::string - get_name() { + name() { return name_; } @@ -88,9 +88,29 @@ class Expr { return true; } + virtual std::string + ToString() const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + } + + virtual bool + IsSource() const { + return false; + } + + virtual std::optional + GetColumnInfo() const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + } + + const std::vector>& + GetInputsRef() { + return inputs_; + } + protected: DataType type_; - const std::vector> inputs_; + std::vector> inputs_; std::string name_; // NOTE: unused std::shared_ptr vector_func_; @@ -167,6 +187,11 @@ class SegmentExpr : public Expr { } } + virtual bool + IsSource() const override { + return true; + } + void MoveCursorForDataMultipleChunk() { int64_t processed_size = 0; @@ -1142,6 +1167,9 @@ class SegmentExpr : public Expr { std::shared_ptr cached_match_res_{nullptr}; }; +bool +IsLikeExpr(std::shared_ptr expr); + void OptimizeCompiledExprs(ExecContext* context, const std::vector& exprs); diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 3dd9e5b665..1a34e29538 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -30,17 +30,17 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { if (is_index_mode_ && !has_offset_input_) { result = EvalArrayContainsForIndexSegment(); } else { - result = EvalJsonContainsForDataSegment(input); + result = EvalJsonContainsForDataSegment(context); } break; } case DataType::JSON: { - if (is_index_mode_ && !has_offset_input_) { - PanicInfo( - ExprInvalid, - "exists expr for json or array index mode not supported"); + if (is_index_mode_ && !context.get_offset_input()) { + PanicInfo(ExprInvalid, + "exists expr for json or array index mode not " + "supported"); } - result = EvalJsonContainsForDataSegment(input); + result = EvalJsonContainsForDataSegment(context); break; } default: @@ -51,7 +51,7 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } VectorPtr -PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { +PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(EvalCtx& context) { auto data_type = expr_->column_.data_type_; switch (expr_->op_) { case proto::plan::JSONContainsExpr_JSONOp_Contains: @@ -60,16 +60,16 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { auto val_type = expr_->vals_[0].val_case(); switch (val_type) { case proto::plan::GenericValue::kBoolVal: { - return ExecArrayContains(input); + return ExecArrayContains(context); } case proto::plan::GenericValue::kInt64Val: { - return ExecArrayContains(input); + return ExecArrayContains(context); } case proto::plan::GenericValue::kFloatVal: { - return ExecArrayContains(input); + return ExecArrayContains(context); } case proto::plan::GenericValue::kStringVal: { - return ExecArrayContains(input); + return ExecArrayContains(context); } default: PanicInfo( @@ -81,19 +81,19 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { auto val_type = expr_->vals_[0].val_case(); switch (val_type) { case proto::plan::GenericValue::kBoolVal: { - return ExecJsonContains(input); + return ExecJsonContains(context); } case proto::plan::GenericValue::kInt64Val: { - return ExecJsonContains(input); + return ExecJsonContains(context); } case proto::plan::GenericValue::kFloatVal: { - return ExecJsonContains(input); + return ExecJsonContains(context); } case proto::plan::GenericValue::kStringVal: { - return ExecJsonContains(input); + return ExecJsonContains(context); } case proto::plan::GenericValue::kArrayVal: { - return ExecJsonContainsArray(input); + return ExecJsonContainsArray(context); } default: PanicInfo(DataTypeInvalid, @@ -101,7 +101,7 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { val_type); } } else { - return ExecJsonContainsWithDiffType(input); + return ExecJsonContainsWithDiffType(context); } } } @@ -110,16 +110,16 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { auto val_type = expr_->vals_[0].val_case(); switch (val_type) { case proto::plan::GenericValue::kBoolVal: { - return ExecArrayContainsAll(input); + return ExecArrayContainsAll(context); } case proto::plan::GenericValue::kInt64Val: { - return ExecArrayContainsAll(input); + return ExecArrayContainsAll(context); } case proto::plan::GenericValue::kFloatVal: { - return ExecArrayContainsAll(input); + return ExecArrayContainsAll(context); } case proto::plan::GenericValue::kStringVal: { - return ExecArrayContainsAll(input); + return ExecArrayContainsAll(context); } default: PanicInfo( @@ -131,19 +131,19 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { auto val_type = expr_->vals_[0].val_case(); switch (val_type) { case proto::plan::GenericValue::kBoolVal: { - return ExecJsonContainsAll(input); + return ExecJsonContainsAll(context); } case proto::plan::GenericValue::kInt64Val: { - return ExecJsonContainsAll(input); + return ExecJsonContainsAll(context); } case proto::plan::GenericValue::kFloatVal: { - return ExecJsonContainsAll(input); + return ExecJsonContainsAll(context); } case proto::plan::GenericValue::kStringVal: { - return ExecJsonContainsAll(input); + return ExecJsonContainsAll(context); } case proto::plan::GenericValue::kArrayVal: { - return ExecJsonContainsAllArray(input); + return ExecJsonContainsAllArray(context); } default: PanicInfo(DataTypeInvalid, @@ -151,7 +151,7 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { val_type); } } else { - return ExecJsonContainsAllWithDiffType(input); + return ExecJsonContainsAllWithDiffType(context); } } } @@ -164,11 +164,13 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment(OffsetVector* input) { template VectorPtr -PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecArrayContains(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -177,18 +179,21 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { AssertInfo(expr_->column_.nested_path_.size() == 0, "[ExecArrayContains]nested path must be null"); - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); if (!arg_inited_) { arg_set_ = std::make_shared>(expr_->vals_); arg_inited_ = true; } + + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -205,6 +210,7 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -214,8 +220,12 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -241,30 +251,36 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { template VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContains(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); if (!arg_inited_) { arg_set_ = std::make_shared>(expr_->vals_); arg_inited_ = true; } + + size_t processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -290,6 +306,7 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -299,8 +316,12 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -329,26 +350,31 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { } VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); std::vector elements; for (auto const& element : expr_->vals_) { elements.emplace_back(GetValueFromProto(element)); } + + size_t processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -383,6 +409,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -392,8 +419,12 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -423,11 +454,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { template VectorPtr -PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); AssertInfo(expr_->column_.nested_path_.size() == 0, "[ExecArrayContainsAll]nested path must be null"); auto real_batch_size = @@ -436,19 +469,21 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); std::set elements; for (auto const& element : expr_->vals_) { elements.insert(GetValueFromProto(element)); } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -467,6 +502,7 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { } return tmp_elements.size() == 0; }; + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -476,8 +512,12 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -503,22 +543,24 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { template VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); std::set elements; @@ -526,8 +568,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { elements.insert(GetValueFromProto(element)); } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -556,6 +600,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { } return tmp_elements.size() == 0; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -565,8 +610,12 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -595,18 +644,19 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { } VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( - OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); @@ -618,8 +668,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( i++; } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -707,6 +759,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( } return tmp_elements_index.size() == 0; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -716,8 +769,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -748,18 +806,20 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( } VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); @@ -767,8 +827,11 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { for (auto const& element : expr_->vals_) { elements.emplace_back(GetValueFromProto(element)); } + + size_t processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -807,6 +870,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { } return exist_elements_index.size() == elements.size(); }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -816,8 +880,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -846,18 +915,20 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { } VectorPtr -PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { +PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); @@ -869,8 +940,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { i++; } + size_t processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -949,6 +1022,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -958,8 +1032,13 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index f977a6d544..02243ca1f6 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -51,37 +51,52 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: VectorPtr - EvalJsonContainsForDataSegment(OffsetVector* input = nullptr); + EvalJsonContainsForDataSegment(EvalCtx& context); template VectorPtr - ExecJsonContains(OffsetVector* input = nullptr); + ExecJsonContains(EvalCtx& context); template VectorPtr - ExecArrayContains(OffsetVector* input = nullptr); + ExecArrayContains(EvalCtx& context); template VectorPtr - ExecJsonContainsAll(OffsetVector* input = nullptr); + ExecJsonContainsAll(EvalCtx& context); template VectorPtr - ExecArrayContainsAll(OffsetVector* input = nullptr); + ExecArrayContainsAll(EvalCtx& context); VectorPtr - ExecJsonContainsArray(OffsetVector* input = nullptr); + ExecJsonContainsArray(EvalCtx& context); VectorPtr - ExecJsonContainsAllArray(OffsetVector* input = nullptr); + ExecJsonContainsAllArray(EvalCtx& context); VectorPtr - ExecJsonContainsAllWithDiffType(OffsetVector* input = nullptr); + ExecJsonContainsAllWithDiffType(EvalCtx& context); VectorPtr - ExecJsonContainsWithDiffType(OffsetVector* input = nullptr); + ExecJsonContainsWithDiffType(EvalCtx& context); VectorPtr EvalArrayContainsForIndexSegment(); diff --git a/internal/core/src/exec/expression/LogicalBinaryExpr.h b/internal/core/src/exec/expression/LogicalBinaryExpr.h index 4db60df939..3a46329050 100644 --- a/internal/core/src/exec/expression/LogicalBinaryExpr.h +++ b/internal/core/src/exec/expression/LogicalBinaryExpr.h @@ -87,6 +87,21 @@ class PhyLogicalBinaryExpr : public Expr { inputs_[1]->SupportOffsetInput(); } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return false; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: std::shared_ptr expr_; }; diff --git a/internal/core/src/exec/expression/LogicalUnaryExpr.h b/internal/core/src/exec/expression/LogicalUnaryExpr.h index 7127745670..00d6af3e0c 100644 --- a/internal/core/src/exec/expression/LogicalUnaryExpr.h +++ b/internal/core/src/exec/expression/LogicalUnaryExpr.h @@ -51,6 +51,21 @@ class PhyLogicalUnaryExpr : public Expr { return inputs_[0]->SupportOffsetInput(); } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return false; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: std::shared_ptr expr_; }; diff --git a/internal/core/src/exec/expression/NullExpr.h b/internal/core/src/exec/expression/NullExpr.h index 448cb340c9..415c48de80 100644 --- a/internal/core/src/exec/expression/NullExpr.h +++ b/internal/core/src/exec/expression/NullExpr.h @@ -50,6 +50,21 @@ class PhyNullExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: ColumnVectorPtr PreCheckNullable(OffsetVector* input); diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 1b5bb74cde..16d2d6000d 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -32,31 +32,31 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } switch (expr_->column_.data_type_) { case DataType::BOOL: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::INT8: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::INT16: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::INT32: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::INT64: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::FLOAT: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::DOUBLE: { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); break; } case DataType::VARCHAR: { @@ -64,30 +64,30 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { !storage::MmapManager::GetInstance() .GetMmapConfig() .growing_enable_mmap) { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); } else { - result = ExecVisitorImpl(input); + result = ExecVisitorImpl(context); } break; } case DataType::JSON: { if (expr_->vals_.size() == 0) { - result = ExecVisitorImplTemplateJson(input); + result = ExecVisitorImplTemplateJson(context); break; } auto type = expr_->vals_[0].val_case(); switch (type) { case proto::plan::GenericValue::ValCase::kBoolVal: - result = ExecVisitorImplTemplateJson(input); + result = ExecVisitorImplTemplateJson(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: - result = ExecVisitorImplTemplateJson(input); + result = ExecVisitorImplTemplateJson(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: - result = ExecVisitorImplTemplateJson(input); + result = ExecVisitorImplTemplateJson(context); break; case proto::plan::GenericValue::ValCase::kStringVal: - result = ExecVisitorImplTemplateJson(input); + result = ExecVisitorImplTemplateJson(context); break; default: PanicInfo(DataTypeInvalid, "unknown data type: {}", type); @@ -97,26 +97,26 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { case DataType::ARRAY: { if (expr_->vals_.size() == 0) { SetNotUseIndex(); - result = ExecVisitorImplTemplateArray(input); + result = ExecVisitorImplTemplateArray(context); break; } auto type = expr_->vals_[0].val_case(); switch (type) { case proto::plan::GenericValue::ValCase::kBoolVal: SetNotUseIndex(); - result = ExecVisitorImplTemplateArray(input); + result = ExecVisitorImplTemplateArray(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: SetNotUseIndex(); - result = ExecVisitorImplTemplateArray(input); + result = ExecVisitorImplTemplateArray(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: SetNotUseIndex(); - result = ExecVisitorImplTemplateArray(input); + result = ExecVisitorImplTemplateArray(context); break; case proto::plan::GenericValue::ValCase::kStringVal: SetNotUseIndex(); - result = ExecVisitorImplTemplateArray(input); + result = ExecVisitorImplTemplateArray(context); break; default: PanicInfo(DataTypeInvalid, "unknown data type: {}", type); @@ -216,12 +216,11 @@ PhyTermFilterExpr::ExecPkTermImpl() { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); - // pk valid_bitmap is always all true TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto current_chunk_view = cached_bits_.view(current_data_chunk_pos_, real_batch_size); @@ -233,9 +232,9 @@ PhyTermFilterExpr::ExecPkTermImpl() { template VectorPtr -PhyTermFilterExpr::ExecVisitorImplTemplateJson(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImplTemplateJson(EvalCtx& context) { if (expr_->is_in_field_) { - return ExecTermJsonVariableInField(input); + return ExecTermJsonVariableInField(context); } else { if (is_index_mode_) { // we create double index for json int64 field for now @@ -243,40 +242,42 @@ PhyTermFilterExpr::ExecVisitorImplTemplateJson(OffsetVector* input) { std::conditional_t, double, ValueType>; - return ExecVisitorImplForIndex(input); + return ExecVisitorImplForIndex(); } else { - return ExecTermJsonFieldInVariable(input); + return ExecTermJsonFieldInVariable(context); } } } template VectorPtr -PhyTermFilterExpr::ExecVisitorImplTemplateArray(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImplTemplateArray(EvalCtx& context) { if (expr_->is_in_field_) { - return ExecTermArrayVariableInField(input); + return ExecTermArrayVariableInField(context); } else { - return ExecTermArrayFieldInVariable(input); + return ExecTermArrayFieldInVariable(context); } } template VectorPtr -PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { +PhyTermFilterExpr::ExecTermArrayVariableInField(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); AssertInfo(expr_->vals_.size() == 1, "element length in json array must be one"); @@ -286,8 +287,10 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { } auto target_val = arg_val_.GetValue(); + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -304,6 +307,7 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -313,8 +317,12 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; @@ -340,22 +348,24 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { template VectorPtr -PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { +PhyTermFilterExpr::ExecTermArrayFieldInVariable(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); int index = -1; if (expr_->column_.nested_path_.size() > 0) { @@ -372,8 +382,10 @@ PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { return res_vec; } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -382,6 +394,7 @@ PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { TargetBitmapView valid_res, int index, const std::shared_ptr& term_set) { + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -395,9 +408,13 @@ PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } auto value = data[offset].get_data(index); res[i] = term_set->In(ValueType(value)); } + processed_cursor += size; }; int64_t processed_size; @@ -428,21 +445,23 @@ PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { template VectorPtr -PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { +PhyTermFilterExpr::ExecTermJsonVariableInField(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); AssertInfo(expr_->vals_.size() == 1, "element length in json array must be one"); @@ -454,8 +473,10 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const Json* data, const bool* valid_data, const int32_t* offsets, @@ -480,6 +501,7 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { } return false; }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -489,8 +511,12 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { @@ -515,21 +541,25 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { template VectorPtr -PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { +PhyTermFilterExpr::ExecTermJsonFieldInVariable(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + FieldId field_id = expr_->column_.field_id_; + auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); if (!arg_inited_) { @@ -543,8 +573,10 @@ PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { return res_vec; } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const Json* data, const bool* valid_data, const int32_t* offsets, @@ -571,6 +603,7 @@ PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { } return terms->In(ValueType(x.value())); }; + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -584,8 +617,13 @@ PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { res[i] = false; continue; } + + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } res[i] = executor(offset); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { @@ -614,17 +652,17 @@ PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { template VectorPtr -PhyTermFilterExpr::ExecVisitorImpl(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImpl(EvalCtx& context) { if (is_index_mode_ && !has_offset_input_) { - return ExecVisitorImplForIndex(input); + return ExecVisitorImplForIndex(); } else { - return ExecVisitorImplForData(input); + return ExecVisitorImplForData(context); } } template VectorPtr -PhyTermFilterExpr::ExecVisitorImplForIndex(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImplForIndex() { typedef std:: conditional_t, std::string, T> IndexInnerType; @@ -667,7 +705,7 @@ PhyTermFilterExpr::ExecVisitorImplForIndex(OffsetVector* input) { template <> VectorPtr -PhyTermFilterExpr::ExecVisitorImplForIndex(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImplForIndex() { using Index = index::ScalarIndex; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { @@ -689,18 +727,21 @@ PhyTermFilterExpr::ExecVisitorImplForIndex(OffsetVector* input) { template VectorPtr -PhyTermFilterExpr::ExecVisitorImplForData(OffsetVector* input) { +PhyTermFilterExpr::ExecVisitorImplForData(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); if (!arg_inited_) { std::vector vals; @@ -717,8 +758,10 @@ PhyTermFilterExpr::ExecVisitorImplForData(OffsetVector* input) { arg_inited_ = true; } + int processed_cursor = 0; auto execute_sub_batch = - []( + [&processed_cursor, & + bitmap_input ]( const T* data, const bool* valid_data, const int32_t* offsets, @@ -726,6 +769,7 @@ PhyTermFilterExpr::ExecVisitorImplForData(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::shared_ptr& vals) { + bool has_bitmap_input = !bitmap_input.empty(); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -735,8 +779,12 @@ PhyTermFilterExpr::ExecVisitorImplForData(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { + continue; + } res[i] = vals->In(data[offset]); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index 100e6bc2ca..14bc1a3c7f 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -75,6 +75,21 @@ class PhyTermFilterExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + bool + IsSource() const override { + return true; + } + + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: void InitPkCacheOffset(); @@ -88,39 +103,39 @@ class PhyTermFilterExpr : public SegmentExpr { template VectorPtr - ExecVisitorImpl(OffsetVector* input = nullptr); + ExecVisitorImpl(EvalCtx& context); template VectorPtr - ExecVisitorImplForIndex(OffsetVector* input = nullptr); + ExecVisitorImplForIndex(); template VectorPtr - ExecVisitorImplForData(OffsetVector* input = nullptr); + ExecVisitorImplForData(EvalCtx& context); template VectorPtr - ExecVisitorImplTemplateJson(OffsetVector* input = nullptr); + ExecVisitorImplTemplateJson(EvalCtx& context); template VectorPtr - ExecTermJsonVariableInField(OffsetVector* input = nullptr); + ExecTermJsonVariableInField(EvalCtx& context); template VectorPtr - ExecTermJsonFieldInVariable(OffsetVector* input = nullptr); + ExecTermJsonFieldInVariable(EvalCtx& context); template VectorPtr - ExecVisitorImplTemplateArray(OffsetVector* input = nullptr); + ExecVisitorImplTemplateArray(EvalCtx& context); template VectorPtr - ExecTermArrayVariableInField(OffsetVector* input = nullptr); + ExecTermArrayVariableInField(EvalCtx& context); template VectorPtr - ExecTermArrayFieldInVariable(OffsetVector* input = nullptr); + ExecTermArrayFieldInVariable(EvalCtx& context); private: std::shared_ptr expr_; diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 2f3bbbe886..b80605e184 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -89,51 +89,51 @@ PhyUnaryRangeFilterExpr::CanUseIndexForArray() { template VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex() { - return ExecRangeVisitorImplArray(); +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex(EvalCtx& context) { + return ExecRangeVisitorImplArray(context); } template <> VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex< - proto::plan::Array>() { +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex( + EvalCtx& context) { switch (expr_->op_type_) { case proto::plan::Equal: case proto::plan::NotEqual: { switch (expr_->column_.element_type_) { case DataType::BOOL: { - return ExecArrayEqualForIndex(expr_->op_type_ == - proto::plan::NotEqual); + return ExecArrayEqualForIndex( + context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT8: { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT16: { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT32: { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT64: { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::FLOAT: case DataType::DOUBLE: { // not accurate on floating point number, rollback to bruteforce. return ExecRangeVisitorImplArray( - nullptr); + context); } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing) { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } else { return ExecArrayEqualForIndex( - expr_->op_type_ == proto::plan::NotEqual); + context, expr_->op_type_ == proto::plan::NotEqual); } } default: @@ -144,7 +144,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex< } } default: - return ExecRangeVisitorImplArray(); + return ExecRangeVisitorImplArray(context); } } @@ -154,31 +154,31 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { SetHasOffsetInput((input != nullptr)); switch (expr_->column_.data_type_) { case DataType::BOOL: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT8: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT16: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT32: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::INT64: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::FLOAT: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::DOUBLE: { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); break; } case DataType::VARCHAR: { @@ -186,9 +186,9 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { !storage::MmapManager::GetInstance() .GetMmapConfig() .growing_enable_mmap) { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); } else { - result = ExecRangeVisitorImpl(input); + result = ExecRangeVisitorImpl(context); } break; } @@ -227,20 +227,20 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } else { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: - result = ExecRangeVisitorImplJson(input); + result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: - result = ExecRangeVisitorImplJson(input); + result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: - result = ExecRangeVisitorImplJson(input); + result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kStringVal: - result = ExecRangeVisitorImplJson(input); + result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: - result = - ExecRangeVisitorImplJson(input); + result = ExecRangeVisitorImplJson( + context); break; default: PanicInfo( @@ -254,28 +254,28 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: SetNotUseIndex(); - result = ExecRangeVisitorImplArray(input); + result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: SetNotUseIndex(); - result = ExecRangeVisitorImplArray(input); + result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: SetNotUseIndex(); - result = ExecRangeVisitorImplArray(input); + result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kStringVal: SetNotUseIndex(); - result = ExecRangeVisitorImplArray(input); + result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: if (!has_offset_input_ && CanUseIndexForArray()) { result = ExecRangeVisitorImplArrayForIndex< - proto::plan::Array>(); + proto::plan::Array>(context); } else { result = ExecRangeVisitorImplArray( - input); + context); } break; default: @@ -293,17 +293,19 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { template VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(OffsetVector* input) { +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); if (!arg_inited_) { value_arg_.SetValue(expr_->val_); @@ -315,16 +317,18 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(OffsetVector* input) { if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = [op_type]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val, - int index) { + int processed_cursor = 0; + auto execute_sub_batch = + [ op_type, &processed_cursor, & + bitmap_input ]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val, + int index) { switch (op_type) { case proto::plan::GreaterThan: { UnaryElementFuncForArray VectorPtr -PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { +PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context, + bool reverse) { typedef std:: conditional_t, std::string, T> IndexInnerType; @@ -491,7 +513,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { auto val = GetValueFromProto(expr_->val_); if (val.array_size() == 0) { // rollback to bruteforce. no candidates will be filtered out via index. - return ExecRangeVisitorImplArray(); + return ExecRangeVisitorImplArray(context); } // cache the result to suit the framework. @@ -587,11 +609,14 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { template VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + FieldId field_id = expr_->column_.field_id_; auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { @@ -602,13 +627,13 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } - - ExprValueType val = value_arg_.GetValue(); - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); + + ExprValueType val = value_arg_.GetValue(); auto op_type = expr_->op_type_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); @@ -642,8 +667,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = (cmp); \ } while (false) + int processed_cursor = 0; auto execute_sub_batch = - [ op_type, pointer ]( + [ op_type, pointer, &processed_cursor, & + bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -651,6 +678,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, ExprValueType val) { + bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { @@ -662,6 +690,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -680,6 +712,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -698,6 +734,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -716,6 +756,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -734,6 +778,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); @@ -758,6 +806,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); @@ -782,6 +834,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -804,6 +860,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && + !bitmap_input[i + processed_cursor]) { + continue; + } if constexpr (std::is_same_v) { res[i] = false; } else { @@ -819,6 +879,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { fmt::format("unsupported operator type for unary expr: {}", op_type)); } + processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { @@ -839,7 +900,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { template VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(OffsetVector* input) { +PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(EvalCtx& context) { if (expr_->op_type_ == proto::plan::OpType::TextMatch || expr_->op_type_ == proto::plan::OpType::PhraseMatch) { if (has_offset_input_) { @@ -853,7 +914,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(OffsetVector* input) { if (CanUseIndex() && !has_offset_input_) { return ExecRangeVisitorImplForIndex(); } else { - return ExecRangeVisitorImplForData(input); + return ExecRangeVisitorImplForData(context); } } @@ -1003,10 +1064,13 @@ PhyUnaryRangeFilterExpr::PreCheckOverflow(OffsetVector* input) { template VectorPtr -PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { +PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { typedef std:: conditional_t, std::string, T> IndexInnerType; + auto* input = context.get_offset_input(); + const auto& bitmap_input = context.get_bitmap_input(); + if (auto res = PreCheckOverflow(input)) { return res; } @@ -1022,62 +1086,112 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { arg_inited_ = true; } IndexInnerType val = GetValueFromProto(expr_->val_); - auto res_vec = std::make_shared( - TargetBitmap(real_batch_size), TargetBitmap(real_batch_size)); + auto res_vec = + std::make_shared(TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); - valid_res.set(); auto expr_type = expr_->op_type_; - auto execute_sub_batch = [expr_type]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + size_t processed_cursor = 0; + auto execute_sub_batch = + [ expr_type, &processed_cursor, & + bitmap_input ]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::GreaterEqual: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::LessThan: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::LessEqual: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::Equal: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::NotEqual: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::PrefixMatch: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } case proto::plan::Match: { UnaryElementFunc func; - func(data, size, val, res, offsets); + func(data, + size, + val, + res, + bitmap_input, + processed_cursor, + offsets); break; } default: @@ -1090,7 +1204,11 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { // so not divide data again for the reason that it may reduce performance if the null distribution is scattered // but to mask res with valid_data after the batch operation. if (valid_data != nullptr) { + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; i++) { + if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { + continue; + } auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; @@ -1100,6 +1218,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { } } } + processed_cursor += size; }; auto skip_index_func = [expr_type, val](const SkipIndex& skip_index, diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index de9866a6a8..318724864d 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -41,15 +41,20 @@ struct UnaryElementFuncForMatch { void operator()(const T* src, - size_t size, IndexInnerType val, TargetBitmapView res, - int64_t* offsets = nullptr) { + const TargetBitmap& bitmap_input, + int start_cursor, + const int32_t* offsets = nullptr) { PatternMatchTranslator translator; auto regex_pattern = translator(val); RegexMatcher matcher(regex_pattern); + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { + continue; + } if constexpr (filter_type == FilterType::random) { res[i] = matcher(src[offsets ? offsets[i] : i]); } else { @@ -69,17 +74,25 @@ struct UnaryElementFunc { size_t size, IndexInnerType val, TargetBitmapView res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { + bool has_bitmap_input = !bitmap_input.empty(); if constexpr (op == proto::plan::OpType::Match) { UnaryElementFuncForMatch func; - func(src, size, val, res); + func(src, size, val, res, bitmap_input, start_cursor, offsets); return; } // This is the original code, which is kept for the documentation purposes // also, for iterative filter - if constexpr (filter_type == FilterType::random) { + if constexpr (filter_type == FilterType::random || + std::is_same_v || + std::is_same_v) { for (int i = 0; i < size; ++i) { + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { + continue; + } auto offset = (offsets != nullptr) ? offsets[i] : i; if constexpr (op == proto::plan::OpType::Equal) { res[i] = src[offset] == val; @@ -164,7 +177,10 @@ struct UnaryElementFuncForArray { int index, TargetBitmapView res, TargetBitmapView valid_res, + const TargetBitmap& bitmap_input, + size_t start_cursor, const int32_t* offsets = nullptr) { + bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { @@ -174,6 +190,9 @@ struct UnaryElementFuncForArray { res[i] = valid_res[i] = false; continue; } + if (has_bitmap_input && !bitmap_input[i + start_cursor]) { + continue; + } if constexpr (op == proto::plan::OpType::Equal) { if constexpr (std::is_same_v) { res[i] = src[offset].is_same_array(val); @@ -340,10 +359,30 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { return true; } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + + bool + IsSource() const override { + return true; + } + + std::shared_ptr + GetLogicalExpr() { + return expr_; + } + private: template VectorPtr - ExecRangeVisitorImpl(OffsetVector* input = nullptr); + ExecRangeVisitorImpl(EvalCtx& context); template VectorPtr @@ -351,23 +390,23 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { template VectorPtr - ExecRangeVisitorImplForData(OffsetVector* input = nullptr); + ExecRangeVisitorImplForData(EvalCtx& context); template VectorPtr - ExecRangeVisitorImplJson(OffsetVector* input = nullptr); + ExecRangeVisitorImplJson(EvalCtx& context); template VectorPtr - ExecRangeVisitorImplArray(OffsetVector* input = nullptr); + ExecRangeVisitorImplArray(EvalCtx& context); template VectorPtr - ExecRangeVisitorImplArrayForIndex(); + ExecRangeVisitorImplArrayForIndex(EvalCtx& context); template VectorPtr - ExecArrayEqualForIndex(bool reverse); + ExecArrayEqualForIndex(EvalCtx& context, bool reverse); // Check overflow and cache result for performace template diff --git a/internal/core/src/exec/expression/ValueExpr.h b/internal/core/src/exec/expression/ValueExpr.h index b2ccace223..b2d618b92b 100644 --- a/internal/core/src/exec/expression/ValueExpr.h +++ b/internal/core/src/exec/expression/ValueExpr.h @@ -59,6 +59,21 @@ class PhyValueExpr : public Expr { } } + std::string + ToString() const { + return fmt::format("{}", expr_->ToString()); + } + + bool + IsSource() const override { + return true; + } + + std::optional + GetColumnInfo() const override { + return std::nullopt; + } + private: std::shared_ptr expr_; const int64_t active_count_; diff --git a/internal/core/src/monitor/prometheus_client.cpp b/internal/core/src/monitor/prometheus_client.cpp index 50027dc4c0..7ec3dadae2 100644 --- a/internal/core/src/monitor/prometheus_client.cpp +++ b/internal/core/src/monitor/prometheus_client.cpp @@ -210,6 +210,8 @@ std::map searchGetTargetEntryLatencyLabels{ {"type", "search_get_target_entry_latency"}}; std::map randomSampleLatencyLabels{ {"type", "random_sample_latency"}}; +std::map optimizeExprLatencyLabels{ + {"type", "optimize_expr_latency"}}; DEFINE_PROMETHEUS_HISTOGRAM_FAMILY(internal_core_search_latency, "[cpp]latency(us) of search on segment") @@ -242,7 +244,9 @@ DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_get_target_entry_latency, DEFINE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_random_sample, internal_core_search_latency, randomSampleLatencyLabels) - +DEFINE_PROMETHEUS_HISTOGRAM(internal_core_optimize_expr_latency, + internal_core_search_latency, + optimizeExprLatencyLabels) // mmap metrics std::map mmapAllocatedSpaceAnonLabel = { {"type", "anon"}}; diff --git a/internal/core/src/monitor/prometheus_client.h b/internal/core/src/monitor/prometheus_client.h index 758815a290..579f9f562e 100644 --- a/internal/core/src/monitor/prometheus_client.h +++ b/internal/core/src/monitor/prometheus_client.h @@ -142,6 +142,7 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_core_get_vector_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_retrieve_get_target_entry_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_get_target_entry_latency); DECLARE_PROMETHEUS_HISTOGRAM(internal_core_search_latency_random_sample); +DECLARE_PROMETHEUS_HISTOGRAM(internal_core_optimize_expr_latency); // async cgo metrics DECLARE_PROMETHEUS_HISTOGRAM_FAMILY(internal_cgo_queue_duration_seconds); diff --git a/internal/core/unittest/test_exec.cpp b/internal/core/unittest/test_exec.cpp index c98ae465c7..6d8539e863 100644 --- a/internal/core/unittest/test_exec.cpp +++ b/internal/core/unittest/test_exec.cpp @@ -27,6 +27,7 @@ #include "exec/QueryContext.h" #include "expr/ITypeExpr.h" #include "exec/expression/Expr.h" +#include "exec/expression/ConjunctExpr.h" #include "exec/expression/function/FunctionFactory.h" using namespace milvus; @@ -82,10 +83,12 @@ class TaskTest : public testing::TestWithParam { field_map_.insert({"string2", str2_fid}); auto str3_fid = schema->AddDebugField("string3", DataType::VARCHAR); field_map_.insert({"string3", str3_fid}); + auto json_fid = schema->AddDebugField("json", DataType::JSON); + field_map_.insert({"json", json_fid}); schema->set_primary_field_id(str1_fid); auto segment = CreateSealedSegment(schema); - size_t N = 1000000; + size_t N = 100000; num_rows_ = N; auto raw_data = DataGen(schema, N); auto fields = schema->get_fields(); @@ -152,7 +155,7 @@ TEST_P(TaskTest, CallExprEmpty) { auto query_context = std::make_shared( "test1", segment_.get(), - 1000000, + 100000, MAX_TIMESTAMP, std::make_shared( std::unordered_map{})); @@ -189,7 +192,7 @@ TEST_P(TaskTest, UnaryExpr) { auto query_context = std::make_shared( "test1", segment_.get(), - 1000000, + 100000, MAX_TIMESTAMP, std::make_shared( std::unordered_map{})); @@ -235,7 +238,7 @@ TEST_P(TaskTest, LogicalExpr) { auto query_context = std::make_shared( "test1", segment_.get(), - 1000000, + 100000, MAX_TIMESTAMP, std::make_shared( std::unordered_map{})); @@ -296,12 +299,12 @@ TEST_P(TaskTest, CompileInputs_and) { auto expr7 = std::make_shared( expr::LogicalBinaryExpr::OpType::And, expr3, expr6); auto query_context = std::make_shared( - DEAFULT_QUERY_ID, segment_.get(), 1000000, MAX_TIMESTAMP); + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); auto exprs = milvus::exec::CompileInputs(expr7, query_context.get(), {}); EXPECT_EQ(exprs.size(), 4); for (int i = 0; i < exprs.size(); ++i) { - std::cout << exprs[i]->get_name() << std::endl; - EXPECT_STREQ(exprs[i]->get_name().c_str(), "PhyUnaryRangeFilterExpr"); + std::cout << exprs[i]->name() << std::endl; + EXPECT_STREQ(exprs[i]->name().c_str(), "PhyUnaryRangeFilterExpr"); } } @@ -316,7 +319,7 @@ TEST_P(TaskTest, CompileInputs_or_with_and) { proto::plan::GenericValue val; val.set_int64_val(10); { - // expr: (int64_fid < 10 and int64_fid < 10) or (int64_fid < 10 and int64_fid < 10) + // expr: (int64_fid > 10 and int64_fid > 10) or (int64_fid > 10 and int64_fid > 10) auto expr1 = std::make_shared( expr::ColumnInfo(int64_fid, DataType::INT64), proto::plan::OpType::GreaterThan, @@ -342,19 +345,19 @@ TEST_P(TaskTest, CompileInputs_or_with_and) { auto expr6 = std::make_shared( expr::LogicalBinaryExpr::OpType::And, expr1, expr2); auto query_context = std::make_shared( - DEAFULT_QUERY_ID, segment_.get(), 1000000, MAX_TIMESTAMP); + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); auto expr7 = std::make_shared( expr::LogicalBinaryExpr::OpType::Or, expr3, expr6); auto exprs = milvus::exec::CompileInputs(expr7, query_context.get(), {}); EXPECT_EQ(exprs.size(), 2); for (int i = 0; i < exprs.size(); ++i) { - std::cout << exprs[i]->get_name() << std::endl; - EXPECT_STREQ(exprs[i]->get_name().c_str(), "and"); + std::cout << exprs[i]->name() << std::endl; + EXPECT_STREQ(exprs[i]->name().c_str(), "PhyConjunctFilterExpr"); } } { - // expr: (int64_fid < 10 or int64_fid < 10) or (int64_fid < 10 and int64_fid < 10) + // expr: (int64_fid < 10 or int64_fid < 10) or (int64_fid > 10 and int64_fid > 10) auto expr1 = std::make_shared( expr::ColumnInfo(int64_fid, DataType::INT64), proto::plan::OpType::GreaterThan, @@ -380,7 +383,7 @@ TEST_P(TaskTest, CompileInputs_or_with_and) { auto expr6 = std::make_shared( expr::LogicalBinaryExpr::OpType::And, expr1, expr2); auto query_context = std::make_shared( - DEAFULT_QUERY_ID, segment_.get(), 1000000, MAX_TIMESTAMP); + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); auto expr7 = std::make_shared( expr::LogicalBinaryExpr::OpType::Or, expr3, expr6); auto exprs = @@ -388,14 +391,13 @@ TEST_P(TaskTest, CompileInputs_or_with_and) { std::cout << exprs.size() << std::endl; EXPECT_EQ(exprs.size(), 3); for (int i = 0; i < exprs.size() - 1; ++i) { - std::cout << exprs[i]->get_name() << std::endl; - EXPECT_STREQ(exprs[i]->get_name().c_str(), - "PhyUnaryRangeFilterExpr"); + std::cout << exprs[i]->name() << std::endl; + EXPECT_STREQ(exprs[i]->name().c_str(), "PhyUnaryRangeFilterExpr"); } - EXPECT_STREQ(exprs[2]->get_name().c_str(), "and"); + EXPECT_STREQ(exprs[2]->name().c_str(), "PhyConjunctFilterExpr"); } { - // expr: (int64_fid < 10 or int64_fid < 10) and (int64_fid < 10 and int64_fid < 10) + // expr: (int64_fid > 10 or int64_fid > 10) and (int64_fid > 10 and int64_fid > 10) auto expr1 = std::make_shared( expr::ColumnInfo(int64_fid, DataType::INT64), proto::plan::OpType::GreaterThan, @@ -421,18 +423,282 @@ TEST_P(TaskTest, CompileInputs_or_with_and) { auto expr6 = std::make_shared( expr::LogicalBinaryExpr::OpType::And, expr1, expr2); auto query_context = std::make_shared( - DEAFULT_QUERY_ID, segment_.get(), 1000000, MAX_TIMESTAMP); + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); auto expr7 = std::make_shared( expr::LogicalBinaryExpr::OpType::And, expr3, expr6); auto exprs = milvus::exec::CompileInputs(expr7, query_context.get(), {}); std::cout << exprs.size() << std::endl; EXPECT_EQ(exprs.size(), 3); - EXPECT_STREQ(exprs[0]->get_name().c_str(), "or"); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); for (int i = 1; i < exprs.size(); ++i) { - std::cout << exprs[i]->get_name() << std::endl; - EXPECT_STREQ(exprs[i]->get_name().c_str(), - "PhyUnaryRangeFilterExpr"); + std::cout << exprs[i]->name() << std::endl; + EXPECT_STREQ(exprs[i]->name().c_str(), "PhyUnaryRangeFilterExpr"); } } } + +TEST_P(TaskTest, Test_reorder) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + using namespace milvus::exec; + + { + // expr: string2 like '%xx' and string2 == 'xxx' + // reorder: string2 == "xxx" and string2 like '%xxx' + proto::plan::GenericValue val1; + val1.set_string_val("%xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["string2"], DataType::VARCHAR), + proto::plan::OpType::Match, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_string_val("xxx"); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["string2"], DataType::VARCHAR), + proto::plan::OpType::Equal, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: string2 == 'xxx' and int1 < 100 + // reorder: int1 < 100 and string2 == 'xxx' + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["string2"], DataType::VARCHAR), + proto::plan::OpType::Equal, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["int64"], DataType::INT64), + proto::plan::OpType::LessThan, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: json['b'] like '%xx' and json['a'] == 'xxx' + // reorder: json['a'] == 'xxx' and json['b'] like '%xx' + proto::plan::GenericValue val1; + val1.set_string_val("%xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["json"], DataType::JSON), + proto::plan::OpType::Match, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_string_val("xxx"); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["json"], DataType::JSON), + proto::plan::OpType::Equal, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: json['a'] == 'xxx' and int1 == 100 + // reorder: int1 == 100 and json['a'] == 'xxx' + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["json"], DataType::JSON), + proto::plan::OpType::Equal, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["int64"], DataType::INT64), + proto::plan::OpType::Equal, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: json['a'] == 'xxx' and 0 < int1 < 100 + // reorder: 0 < int1 < 100 and json['a'] == 'xxx' + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["json"], DataType::JSON), + proto::plan::OpType::Equal, + val1, + std::vector{}); + proto::plan::GenericValue low; + low.set_int64_val(0); + proto::plan::GenericValue upper; + upper.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["int64"], DataType::INT64), + low, + upper, + false, + false); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: string1 != string2 and 0 < int1 < 100 + // reorder: 0 < int1 < 100 and string1 != string2 + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared(field_map_["string1"], + field_map_["string2"], + DataType::VARCHAR, + DataType::VARCHAR, + OpType::LessThan); + proto::plan::GenericValue low; + low.set_int64_val(0); + proto::plan::GenericValue upper; + upper.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["int64"], DataType::INT64), + low, + upper, + false, + false); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 2); + EXPECT_EQ(reorder[0], 1); + EXPECT_EQ(reorder[1], 0); + } + + { + // expr: string2 like '%xx' and string2 == 'xxx' + // disable optimize expr, still remain sequence + proto::plan::GenericValue val1; + val1.set_string_val("%xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(field_map_["string2"], DataType::VARCHAR), + proto::plan::OpType::Match, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_string_val("xxx"); + auto expr2 = std::make_shared( + expr::ColumnInfo(field_map_["string2"], DataType::VARCHAR), + proto::plan::OpType::Equal, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + auto query_context = std::make_shared( + DEAFULT_QUERY_ID, segment_.get(), 100000, MAX_TIMESTAMP); + ExecContext context(query_context.get()); + OPTIMIZE_EXPR_ENABLED = false; + auto exprs = + milvus::exec::CompileExpressions({expr3}, &context, {}, false); + EXPECT_EQ(exprs.size(), 1); + EXPECT_STREQ(exprs[0]->name().c_str(), "PhyConjunctFilterExpr"); + auto phy_expr = + std::static_pointer_cast( + exprs[0]); + std::cout << phy_expr->ToString() << std::endl; + auto reorder = phy_expr->GetReorder(); + EXPECT_EQ(reorder.size(), 0); + OPTIMIZE_EXPR_ENABLED = true; + } +} diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 13865f34c6..fa05a34616 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -3401,6 +3401,145 @@ TEST_P(ExprTest, TestSealedSegmentGetBatchSize) { std::cout << "end compare test" << std::endl; } +TEST_P(ExprTest, TestReorder) { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("id", DataType::INT64); + auto bool_fid = schema->AddDebugField("bool", DataType::BOOL); + auto bool_1_fid = schema->AddDebugField("bool1", DataType::BOOL); + auto int8_fid = schema->AddDebugField("int8", DataType::INT8); + auto int8_1_fid = schema->AddDebugField("int81", DataType::INT8); + auto int16_fid = schema->AddDebugField("int16", DataType::INT16); + auto int16_1_fid = schema->AddDebugField("int161", DataType::INT16); + auto int32_fid = schema->AddDebugField("int32", DataType::INT32); + auto int32_1_fid = schema->AddDebugField("int321", DataType::INT32); + auto int64_fid = schema->AddDebugField("int64", DataType::INT64); + auto int64_1_fid = schema->AddDebugField("int641", DataType::INT64); + auto float_fid = schema->AddDebugField("float", DataType::FLOAT); + auto float_1_fid = schema->AddDebugField("float1", DataType::FLOAT); + auto double_fid = schema->AddDebugField("double", DataType::DOUBLE); + auto double_1_fid = schema->AddDebugField("double1", DataType::DOUBLE); + auto str1_fid = schema->AddDebugField("string1", DataType::VARCHAR); + auto str2_fid = schema->AddDebugField("string2", DataType::VARCHAR); + auto json_fid = schema->AddDebugField("json", DataType::JSON, false); + auto str_array_fid = + schema->AddDebugField("str_array", DataType::ARRAY, DataType::VARCHAR); + schema->set_primary_field_id(pk); + + auto seg = CreateSealedSegment(schema); + size_t N = 1000; + auto raw_data = DataGen(schema, N); + auto fields = schema->get_fields(); + for (auto field_data : raw_data.raw_->fields_data()) { + int64_t field_id = field_data.field_id(); + + auto info = FieldDataInfo(field_data.field_id(), N, "/tmp/a"); + auto field_meta = fields.at(FieldId(field_id)); + info.channel->push( + CreateFieldDataFromDataArray(N, &field_data, field_meta)); + info.channel->close(); + + seg->LoadFieldData(FieldId(field_id), info); + } + + query::ExecPlanNodeVisitor visitor(*seg, MAX_TIMESTAMP); + + auto build_expr = [&](int index) -> expr::TypedExprPtr { + switch (index) { + case 0: { + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(str1_fid, DataType::VARCHAR), + proto::plan::OpType::Equal, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(int64_fid, DataType::INT64), + proto::plan::OpType::LessThan, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + return expr3; + }; + case 1: { + proto::plan::GenericValue val1; + val1.set_string_val("xxx"); + auto expr1 = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {"int"}), + proto::plan::OpType::Equal, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(int64_fid, DataType::INT64), + proto::plan::OpType::LessThan, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + return expr3; + }; + case 2: { + proto::plan::GenericValue val1; + val1.set_string_val("12"); + auto expr1 = std::make_shared( + expr::ColumnInfo(str_array_fid, DataType::ARRAY, {"0"}), + proto::plan::OpType::Match, + val1, + std::vector{}); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(int64_fid, DataType::INT64), + proto::plan::OpType::LessThan, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + return expr3; + }; + case 3: { + auto expr1 = + std::make_shared(int64_fid, + int64_1_fid, + DataType::INT64, + DataType::INT64, + OpType::LessThan); + proto::plan::GenericValue val2; + val2.set_int64_val(100); + auto expr2 = std::make_shared( + expr::ColumnInfo(int64_fid, DataType::INT64), + proto::plan::OpType::LessThan, + val2, + std::vector{}); + auto expr3 = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, expr1, expr2); + return expr3; + }; + default: + PanicInfo(ErrorCode::UnexpectedError, "not implement"); + } + }; + BitsetType final; + auto expr = build_expr(0); + auto plan = + std::make_shared(DEFAULT_PLANNODE_ID, expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + expr = build_expr(1); + plan = std::make_shared(DEFAULT_PLANNODE_ID, expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + expr = build_expr(2); + plan = std::make_shared(DEFAULT_PLANNODE_ID, expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + expr = build_expr(3); + plan = std::make_shared(DEFAULT_PLANNODE_ID, expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); +} + TEST_P(ExprTest, TestCompareExprNullable) { auto schema = std::make_shared(); auto vec_fid = schema->AddDebugField("fakevec", data_type, 16, metric_type); diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 8ac5e0aba6..c5c7567088 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -242,6 +242,9 @@ func (node *QueryNode) InitSegcore() error { cExprBatchSize := C.int64_t(paramtable.Get().QueryNodeCfg.ExprEvalBatchSize.GetAsInt64()) C.InitDefaultExprEvalBatchSize(cExprBatchSize) + cOptimizeExprEnabled := C.bool(paramtable.Get().CommonCfg.EnabledOptimizeExpr.GetAsBool()) + C.InitDefaultOptimizeExprEnable(cOptimizeExprEnabled) + cGpuMemoryPoolInitSize := C.uint32_t(paramtable.Get().GpuConfig.InitSize.GetAsUint32()) cGpuMemoryPoolMaxSize := C.uint32_t(paramtable.Get().GpuConfig.MaxSize.GetAsUint32()) C.SegcoreSetKnowhereGpuMemoryPoolSize(cGpuMemoryPoolInitSize, cGpuMemoryPoolMaxSize) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 400b94891b..2e5cdba192 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -292,6 +292,8 @@ type commonConfig struct { LocalRPCEnabled ParamItem `refreshable:"false"` SyncTaskPoolReleaseTimeoutSeconds ParamItem `refreshable:"true"` + + EnabledOptimizeExpr ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -994,6 +996,15 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.SyncTaskPoolReleaseTimeoutSeconds.Init(base.mgr) + + p.EnabledOptimizeExpr = ParamItem{ + Key: "common.enabledOptimizeExpr", + Version: "2.5.6", + DefaultValue: "true", + Doc: "Indicates whether to enable optimize expr", + Export: true, + } + p.EnabledOptimizeExpr.Init(base.mgr) } type gpuConfig struct {