// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "UnaryExpr.h" #include "common/Json.h" namespace milvus { namespace exec { void PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { switch (expr_->column_.data_type_) { case DataType::BOOL: { result = ExecRangeVisitorImpl(); break; } case DataType::INT8: { result = ExecRangeVisitorImpl(); break; } case DataType::INT16: { result = ExecRangeVisitorImpl(); break; } case DataType::INT32: { result = ExecRangeVisitorImpl(); break; } case DataType::INT64: { result = ExecRangeVisitorImpl(); break; } case DataType::FLOAT: { result = ExecRangeVisitorImpl(); break; } case DataType::DOUBLE: { result = ExecRangeVisitorImpl(); break; } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing) { result = ExecRangeVisitorImpl(); } else { result = ExecRangeVisitorImpl(); } break; } case DataType::JSON: { auto val_type = expr_->val_.val_case(); switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplJson(); break; case proto::plan::GenericValue::ValCase::kInt64Val: result = ExecRangeVisitorImplJson(); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplJson(); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplJson(); break; case proto::plan::GenericValue::ValCase::kArrayVal: result = ExecRangeVisitorImplJson(); break; default: PanicInfo( DataTypeInvalid, "unknown data type: {}", val_type); } break; } case DataType::ARRAY: { auto val_type = expr_->val_.val_case(); switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplArray(); break; case proto::plan::GenericValue::ValCase::kInt64Val: result = ExecRangeVisitorImplArray(); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplArray(); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplArray(); break; case proto::plan::GenericValue::ValCase::kArrayVal: result = ExecRangeVisitorImplArray(); break; default: PanicInfo( DataTypeInvalid, "unknown data type: {}", val_type); } break; } default: PanicInfo(DataTypeInvalid, "unsupported data type: {}", expr_->column_.data_type_); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray() { using GetType = std::conditional_t, std::string_view, ValueType>; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto res_vec = std::make_shared(TargetBitmap(real_batch_size)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); ValueType val = GetValueFromProto(expr_->val_); auto op_type = expr_->op_type_; int index = -1; if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } auto execute_sub_batch = [op_type](const milvus::ArrayView* data, const int size, TargetBitmapView res, ValueType val, int index) { switch (op_type) { case proto::plan::GreaterThan: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::GreaterEqual: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::LessThan: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::LessEqual: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::Equal: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::NotEqual: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } case proto::plan::PrefixMatch: { UnaryElementFuncForArray func; func(data, size, val, index, res); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } }; int64_t processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, val, index); AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson() { using GetType = std::conditional_t, std::string_view, ExprValueType>; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } ExprValueType val = GetValueFromProto(expr_->val_); auto res_vec = std::make_shared(TargetBitmap(real_batch_size)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); auto op_type = expr_->op_type_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); #define UnaryRangeJSONCompare(cmp) \ do { \ auto x = data[i].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[i].template at(pointer); \ res[i] = !x.error() && (cmp); \ break; \ } \ res[i] = false; \ break; \ } \ res[i] = (cmp); \ } while (false) #define UnaryRangeJSONCompareNotEqual(cmp) \ do { \ auto x = data[i].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[i].template at(pointer); \ res[i] = x.error() || (cmp); \ break; \ } \ res[i] = true; \ break; \ } \ res[i] = (cmp); \ } while (false) auto execute_sub_batch = [op_type, pointer](const milvus::Json* data, const int size, TargetBitmapView res, ExprValueType val) { switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() > val); } } break; } case proto::plan::GreaterEqual: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() >= val); } } break; } case proto::plan::LessThan: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() < val); } } break; } case proto::plan::LessEqual: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() <= val); } } break; } case proto::plan::Equal: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompare(x.value() == val); } } break; } case proto::plan::NotEqual: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = !CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompareNotEqual(x.value() != val); } } break; } case proto::plan::PrefixMatch: { for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(milvus::query::Match( ExprValueType(x.value()), val, op_type)); } } break; } case proto::plan::Match: { PatternMatchTranslator translator; auto regex_pattern = translator(val); RegexMatcher matcher(regex_pattern); for (size_t i = 0; i < size; ++i) { if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare( matcher(ExprValueType(x.value()))); } } break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } }; int64_t processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, val); AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl() { if (CanUseIndex()) { return ExecRangeVisitorImplForIndex(); } else { return ExecRangeVisitorImplForData(); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForIndex() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; if (auto res = PreCheckOverflow()) { return res; } auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto op_type = expr_->op_type_; auto execute_sub_batch = [op_type](Index* index_ptr, IndexInnerType val) { TargetBitmap res; switch (op_type) { case proto::plan::GreaterThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::GreaterEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Equal: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::NotEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::PrefixMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Match: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } return res; }; auto val = GetValueFromProto(expr_->val_); auto res = ProcessIndexChunks(execute_sub_batch, val); AssertInfo(res.size() == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", res.size(), real_batch_size); return std::make_shared(std::move(res)); } template ColumnVectorPtr PhyUnaryRangeFilterExpr::PreCheckOverflow() { if constexpr (std::is_integral_v && !std::is_same_v) { int64_t val = GetValueFromProto(expr_->val_); if (milvus::query::out_of_range(val)) { int64_t batch_size = overflow_check_pos_ + batch_size_ >= active_count_ ? active_count_ - overflow_check_pos_ : batch_size_; overflow_check_pos_ += batch_size; if (cached_overflow_res_ != nullptr && cached_overflow_res_->size() == batch_size) { return cached_overflow_res_; } switch (expr_->op_type_) { case proto::plan::GreaterThan: case proto::plan::GreaterEqual: { auto res_vec = std::make_shared( TargetBitmap(batch_size)); cached_overflow_res_ = res_vec; TargetBitmapView res(res_vec->GetRawData(), batch_size); if (milvus::query::lt_lb(val)) { res.set(); return res_vec; } return res_vec; } case proto::plan::LessThan: case proto::plan::LessEqual: { auto res_vec = std::make_shared( TargetBitmap(batch_size)); cached_overflow_res_ = res_vec; TargetBitmapView res(res_vec->GetRawData(), batch_size); if (milvus::query::gt_ub(val)) { res.set(); return res_vec; } return res_vec; } case proto::plan::Equal: { auto res_vec = std::make_shared( TargetBitmap(batch_size)); cached_overflow_res_ = res_vec; TargetBitmapView res(res_vec->GetRawData(), batch_size); res.reset(); return res_vec; } case proto::plan::NotEqual: { auto res_vec = std::make_shared( TargetBitmap(batch_size)); cached_overflow_res_ = res_vec; TargetBitmapView res(res_vec->GetRawData(), batch_size); res.set(); return res_vec; } default: { PanicInfo(OpTypeInvalid, "unsupported range node {}", expr_->op_type_); } } } } return nullptr; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData() { typedef std:: conditional_t, std::string, T> IndexInnerType; if (auto res = PreCheckOverflow()) { return res; } auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } IndexInnerType val = GetValueFromProto(expr_->val_); auto res_vec = std::make_shared(TargetBitmap(real_batch_size)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); auto expr_type = expr_->op_type_; auto execute_sub_batch = [expr_type](const T* data, const int size, TargetBitmapView res, IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::GreaterEqual: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::LessThan: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::LessEqual: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::Equal: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::NotEqual: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::PrefixMatch: { UnaryElementFunc func; func(data, size, val, res); break; } case proto::plan::Match: { UnaryElementFunc func; func(data, size, val, res); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", expr_type)); } }; auto skip_index_func = [expr_type, val](const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) { return skip_index.CanSkipUnaryRange( field_id, chunk_id, expr_type, val); }; int64_t processed_size = ProcessDataChunks(execute_sub_batch, skip_index_func, res, val); AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}, related params[active_count:{}, " "current_data_chunk:{}, num_data_chunk:{}, current_data_pos:{}]", processed_size, real_batch_size, active_count_, current_data_chunk_, num_data_chunk_, current_data_chunk_pos_); return res_vec; } template bool PhyUnaryRangeFilterExpr::CanUseIndex() const { if (!is_index_mode_) { return false; } return SegmentExpr::CanUseIndex(expr_->op_type_); } } // namespace exec } // namespace milvus