// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "UnaryExpr.h" #include #include "common/Json.h" #include #include "common/Types.h" #include "exec/expression/ExprCache.h" #include "common/type_c.h" #include "log/Log.h" namespace milvus { namespace exec { template bool PhyUnaryRangeFilterExpr::CanUseIndexForArray() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { const Index& index = segment_->chunk_scalar_index(field_id_, i); if (index.GetIndexType() == milvus::index::ScalarIndexType::HYBRID || index.GetIndexType() == milvus::index::ScalarIndexType::BITMAP) { return false; } } return true; } template <> bool PhyUnaryRangeFilterExpr::CanUseIndexForArray() { bool res; if (!is_index_mode_) { use_index_ = res = false; return res; } switch (expr_->column_.element_type_) { case DataType::BOOL: res = CanUseIndexForArray(); break; case DataType::INT8: res = CanUseIndexForArray(); break; case DataType::INT16: res = CanUseIndexForArray(); break; case DataType::INT32: res = CanUseIndexForArray(); break; case DataType::INT64: res = CanUseIndexForArray(); break; case DataType::FLOAT: case DataType::DOUBLE: // not accurate on floating point number, rollback to bruteforce. res = false; break; case DataType::VARCHAR: case DataType::STRING: res = CanUseIndexForArray(); break; default: PanicInfo(DataTypeInvalid, "unsupported element type when execute array " "equal for index: {}", expr_->column_.element_type_); } use_index_ = res; return res; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex(EvalCtx& context) { return ExecRangeVisitorImplArray(context); } template <> VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex( EvalCtx& context) { switch (expr_->op_type_) { case proto::plan::Equal: case proto::plan::NotEqual: { switch (expr_->column_.element_type_) { case DataType::BOOL: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT8: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT16: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT32: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT64: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::FLOAT: case DataType::DOUBLE: { // not accurate on floating point number, rollback to bruteforce. return ExecRangeVisitorImplArray( context); } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing) { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } else { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } } default: PanicInfo(DataTypeInvalid, "unsupported element type when execute array " "equal for index: {}", expr_->column_.element_type_); } } default: return ExecRangeVisitorImplArray(context); } } void PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { auto input = context.get_offset_input(); SetHasOffsetInput((input != nullptr)); switch (expr_->column_.data_type_) { case DataType::BOOL: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT8: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT16: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT32: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT64: { result = ExecRangeVisitorImpl(context); break; } case DataType::FLOAT: { result = ExecRangeVisitorImpl(context); break; } case DataType::DOUBLE: { result = ExecRangeVisitorImpl(context); break; } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing && !storage::MmapManager::GetInstance() .GetMmapConfig() .growing_enable_mmap) { result = ExecRangeVisitorImpl(context); } else { result = ExecRangeVisitorImpl(context); } break; } case DataType::JSON: { auto val_type = expr_->val_.val_case(); if (CanUseIndexForJson(FromValCase(val_type)) && !has_offset_input_) { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kInt64Val: if (expr_->val_.has_int64_val()) { proto::plan::GenericValue double_val; double_val.set_float_val( static_cast(expr_->val_.int64_val())); value_arg_.SetValue(double_val); arg_inited_ = true; } result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kArrayVal: result = ExecRangeVisitorImplForIndex(); break; default: PanicInfo( DataTypeInvalid, "unknown data type: {}", val_type); } } else { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: result = ExecRangeVisitorImplJson( context); break; default: PanicInfo( DataTypeInvalid, "unknown data type: {}", val_type); } } break; } case DataType::ARRAY: { auto val_type = expr_->val_.val_case(); switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kStringVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: if (!has_offset_input_ && CanUseIndexForArray()) { result = ExecRangeVisitorImplArrayForIndex< proto::plan::Array>(context); } else { result = ExecRangeVisitorImplArray( context); } break; default: PanicInfo( DataTypeInvalid, "unknown data type: {}", val_type); } break; } default: PanicInfo(DataTypeInvalid, "unsupported data type: {}", expr_->column_.data_type_); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } ValueType val = value_arg_.GetValue(); auto op_type = expr_->op_type_; int index = -1; if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } int processed_cursor = 0; auto execute_sub_batch = [ op_type, &processed_cursor, & bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, ValueType val, int index) { switch (op_type) { case proto::plan::GreaterThan: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::GreaterEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessThan: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Equal: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::NotEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PostfixMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::InnerMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PrefixMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Match: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, std::nullptr_t{}, input, res, valid_res, val, index); } else { processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, valid_res, val, index); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } template VectorPtr PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context, bool reverse) { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } // get all elements. auto val = GetValueFromProto(expr_->val_); if (val.array_size() == 0) { // rollback to bruteforce. no candidates will be filtered out via index. return ExecRangeVisitorImplArray(context); } // cache the result to suit the framework. auto batch_res = ProcessIndexChunks([this, &val, reverse](Index* _) { boost::container::vector elems; for (auto const& element : val.array()) { auto e = GetValueFromProto(element); if (std::find(elems.begin(), elems.end(), e) == elems.end()) { elems.push_back(e); } } // filtering by index, get candidates. std::function retrieve; if (segment_->is_chunked()) { retrieve = [this](int64_t offset) -> const milvus::ArrayView* { auto [chunk_idx, chunk_offset] = segment_->get_chunk_by_offset(field_id_, offset); const auto& chunk = segment_->template chunk_data( field_id_, chunk_idx); return chunk.data() + chunk_offset; }; } else { auto size_per_chunk = segment_->size_per_chunk(); retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = segment_->template chunk_data( field_id_, chunk_idx); return chunk.data() + chunk_offset; }; } // compare the array via the raw data. auto filter = [&retrieve, &val, reverse](size_t offset) -> bool { auto data_ptr = retrieve(offset); return data_ptr->is_same_array(val) ^ reverse; }; // collect all candidates. std::unordered_set candidates; std::unordered_set tmp_candidates; auto first_callback = [&candidates](size_t offset) -> void { candidates.insert(offset); }; auto callback = [&candidates, &tmp_candidates](size_t offset) -> void { if (candidates.find(offset) != candidates.end()) { tmp_candidates.insert(offset); } }; auto execute_sub_batch = [](Index* index_ptr, const IndexInnerType& val, const std::function& callback) { index_ptr->InApplyCallback(1, &val, callback); }; // run in-filter. for (size_t idx = 0; idx < elems.size(); idx++) { if (idx == 0) { ProcessIndexChunksV2( execute_sub_batch, elems[idx], first_callback); } else { ProcessIndexChunksV2( execute_sub_batch, elems[idx], callback); candidates = std::move(tmp_candidates); } // the size of candidates is small enough. if (candidates.size() * 100 < active_count_) { break; } } TargetBitmap res(active_count_); // run post-filter. The filter will only be executed once in the framework. for (const auto& candidate : candidates) { res[candidate] = filter(candidate); } return res; }); AssertInfo(batch_res->size() == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", batch_res->size(), real_batch_size); // return the result. return batch_res; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); FieldId field_id = expr_->column_.field_id_; if (CanUseJsonKeyIndex(field_id) && !has_offset_input_) { return ExecRangeVisitorImplJsonForIndex(); } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); ExprValueType val = value_arg_.GetValue(); auto op_type = expr_->op_type_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); #define UnaryRangeJSONCompare(cmp) \ do { \ auto x = data[offset].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[offset].template at(pointer); \ res[i] = !x.error() && (cmp); \ break; \ } \ res[i] = false; \ break; \ } \ res[i] = (cmp); \ } while (false) #define UnaryRangeJSONCompareNotEqual(cmp) \ do { \ auto x = data[offset].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[offset].template at(pointer); \ res[i] = x.error() || (cmp); \ break; \ } \ res[i] = true; \ break; \ } \ res[i] = (cmp); \ } while (false) int processed_cursor = 0; auto execute_sub_batch = [ op_type, pointer, &processed_cursor, & bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, ExprValueType val) { bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() > val); } } break; } case proto::plan::GreaterEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() >= val); } } break; } case proto::plan::LessThan: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() < val); } } break; } case proto::plan::LessEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() <= val); } } break; } case proto::plan::Equal: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompare(x.value() == val); } } break; } case proto::plan::NotEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = !CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompareNotEqual(x.value() != val); } } break; } case proto::plan::InnerMatch: case proto::plan::PostfixMatch: case proto::plan::PrefixMatch: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(milvus::query::Match( ExprValueType(x.value()), val, op_type)); } } break; } case proto::plan::Match: { PatternMatchTranslator translator; auto regex_pattern = translator(val); RegexMatcher matcher(regex_pattern); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare( matcher(ExprValueType(x.value()))); } } break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets( execute_sub_batch, std::nullptr_t{}, input, res, valid_res, val); } else { processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, valid_res, val); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } std::pair PhyUnaryRangeFilterExpr::SplitAtFirstSlashDigit(std::string input) { boost::regex rgx("/\\d+"); boost::smatch match; if (boost::regex_search(input, match, rgx)) { std::string firstPart = input.substr(0, match.position()); std::string secondPart = input.substr(match.position()); return {firstPart, secondPart}; } else { return {input, ""}; } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() { using GetType = std::conditional_t, std::string_view, ExprValueType>; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto pointerpath = milvus::Json::pointer(expr_->column_.nested_path_); auto pointerpair = SplitAtFirstSlashDigit(pointerpath); std::string pointer = pointerpair.first; std::string arrayIndex = pointerpair.second; #define UnaryRangeJSONIndexCompare(cmp) \ do { \ auto x = json.at(offset, size); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = json.at(offset, size); \ return !x.error() && (cmp); \ } \ return false; \ } \ return (cmp); \ } while (false) #define UnaryJSONTypeCompare(cmp) \ do { \ if constexpr (std::is_same_v) { \ if (type == uint8_t(milvus::index::JSONType::STRING)) { \ auto x = json.at_string(offset, size); \ return (cmp); \ } else { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (type == uint8_t(milvus::index::JSONType::INT64)) { \ auto x = \ std::stoll(std::string(json.at_string(offset, size))); \ return (cmp); \ } else if (type == uint8_t(milvus::index::JSONType::DOUBLE)) { \ auto x = std::stod(std::string(json.at_string(offset, size))); \ return (cmp); \ } else { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (type == uint8_t(milvus::index::JSONType::INT64)) { \ auto x = \ std::stoll(std::string(json.at_string(offset, size))); \ return (cmp); \ } else if (type == uint8_t(milvus::index::JSONType::DOUBLE)) { \ auto x = std::stod(std::string(json.at_string(offset, size))); \ return (cmp); \ } else { \ return false; \ } \ } \ } while (false) #define UnaryJSONTypeCompareWithValue(cmp) \ do { \ if constexpr (std::is_same_v) { \ if (type == uint8_t(milvus::index::JSONType::FLOAT)) { \ float x = *reinterpret_cast(&value); \ return (cmp); \ } else { \ int64_t x = value; \ return (cmp); \ } \ } else if constexpr (std::is_same_v) { \ if (type == uint8_t(milvus::index::JSONType::FLOAT)) { \ float x = *reinterpret_cast(&value); \ return (cmp); \ } else { \ int64_t x = value; \ return (cmp); \ } \ } else if constexpr (std::is_same_v) { \ bool x = *reinterpret_cast(&value); \ return (cmp); \ } \ } while (false) #define UnaryRangeJSONIndexCompareWithArrayIndex(cmp) \ do { \ if (type != uint8_t(milvus::index::JSONType::UNKNOWN)) { \ return false; \ } \ auto array = json.array_at(offset, size); \ if (array.error()) { \ return false; \ } \ auto value = array.at_pointer(arrayIndex); \ if (value.error()) { \ return false; \ } \ if constexpr (std::is_same_v || \ std::is_same_v) { \ if (!value.is_number()) { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (!value.is_string()) { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (!value.is_bool()) { \ return false; \ } \ } \ auto x = value.get(); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = value.get(); \ return !x.error() && (cmp); \ } \ } \ return (cmp); \ } while (false) #define UnaryRangeJSONIndexCompareNotEqual(cmp) \ do { \ auto x = json.at(offset, size); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = json.at(offset, size); \ return x.error() || (cmp); \ } \ return true; \ } \ return (cmp); \ } while (false) #define UnaryRangeJSONIndexCompareNotEqualWithArrayIndex(cmp) \ do { \ auto array = json.array_at(offset, size); \ if (array.error()) { \ return false; \ } \ auto value = array.at_pointer(arrayIndex); \ if (value.error()) { \ return false; \ } \ if constexpr (std::is_same_v || \ std::is_same_v) { \ if (!value.is_number()) { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (!value.is_string()) { \ return false; \ } \ } else if constexpr (std::is_same_v) { \ if (!value.is_bool()) { \ return false; \ } \ } \ auto x = value.get(); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = value.get(); \ return x.error() || (cmp); \ } \ } \ return (cmp); \ } while (false) ExprValueType val = GetValueFromProto(expr_->val_); auto op_type = expr_->op_type_; if (cached_index_chunk_id_ != 0) { cached_index_chunk_id_ = 0; const segcore::SegmentInternalInterface* segment = nullptr; if (segment_->type() == SegmentType::Growing) { segment = dynamic_cast(segment_); } else if (segment_->type() == SegmentType::Sealed) { segment = dynamic_cast(segment_); } auto field_id = expr_->column_.field_id_; auto* index = segment->GetJsonKeyIndex(field_id); Assert(index != nullptr); Assert(segment != nullptr); auto filter_func = [segment, field_id, op_type, val, arrayIndex, pointer](bool valid, uint8_t type, uint32_t row_id, uint16_t offset, uint16_t size, int32_t value) { if (valid) { if (type == uint8_t(milvus::index::JSONType::UNKNOWN) || !arrayIndex.empty()) { return false; } if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::INT32) && type != uint8_t(milvus::index::JSONType::INT64) && type != uint8_t(milvus::index::JSONType::FLOAT) && type != uint8_t(milvus::index::JSONType::DOUBLE)) { return false; } } else if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::STRING) && type != uint8_t(milvus::index::JSONType::STRING_ESCAPE)) { return false; } } else if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::INT32) && type != uint8_t(milvus::index::JSONType::INT64) && type != uint8_t(milvus::index::JSONType::FLOAT) && type != uint8_t(milvus::index::JSONType::DOUBLE)) { return false; } } else if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::BOOL)) { return false; } } switch (op_type) { case proto::plan::GreaterThan: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x > static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x > val); } case proto::plan::GreaterEqual: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x >= static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x >= val); } case proto::plan::LessThan: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x < static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x < val); } case proto::plan::LessEqual: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x <= static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x <= val); } case proto::plan::Equal: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x == static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x == val); } case proto::plan::NotEqual: if (type == uint8_t(milvus::index::JSONType::FLOAT)) { UnaryJSONTypeCompareWithValue( x != static_cast(val)); } else { UnaryJSONTypeCompareWithValue(x != val); } default: return false; } } else { auto json_pair = segment->GetJsonData(field_id, row_id); if (!json_pair.second) { return false; } auto json = milvus::Json(json_pair.first.data(), json_pair.first.size()); switch (op_type) { case proto::plan::GreaterThan: if constexpr (std::is_same_v) { return false; } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( ExprValueType(x.value()) > val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x > val); } else { UnaryRangeJSONIndexCompare( ExprValueType(x.value()) > val); } } } case proto::plan::GreaterEqual: if constexpr (std::is_same_v) { return false; } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( ExprValueType(x.value()) >= val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x >= val); } else { UnaryRangeJSONIndexCompare( ExprValueType(x.value()) >= val); } } } case proto::plan::LessThan: if constexpr (std::is_same_v) { return false; } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( ExprValueType(x.value()) < val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x < val); } else { UnaryRangeJSONIndexCompare( ExprValueType(x.value()) < val); } } } case proto::plan::LessEqual: if constexpr (std::is_same_v) { return false; } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( ExprValueType(x.value()) <= val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x <= val); } else { UnaryRangeJSONIndexCompare( ExprValueType(x.value()) <= val); } } } case proto::plan::Equal: if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::UNKNOWN)) { return false; } auto array = json.array_at(offset, size); if (array.error()) { return false; } return CompareTwoJsonArray(array.value(), val); } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( ExprValueType(x.value()) == val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x == val); } else { UnaryRangeJSONIndexCompare( ExprValueType(x.value()) == val); } } } case proto::plan::NotEqual: if constexpr (std::is_same_v) { if (type != uint8_t(milvus::index::JSONType::UNKNOWN)) { return false; } auto array = json.array_at(offset, size); if (array.error()) { return false; } return !CompareTwoJsonArray(array.value(), val); } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareNotEqualWithArrayIndex( ExprValueType(x.value()) != val); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare(x != val); } else { UnaryRangeJSONIndexCompareNotEqual( ExprValueType(x.value()) != val); } } } case proto::plan::InnerMatch: case proto::plan::PostfixMatch: case proto::plan::PrefixMatch: if constexpr (std::is_same_v) { return false; } else { if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( milvus::query::Match( ExprValueType(x.value()), val, op_type)); } else { if (type == uint8_t( milvus::index::JSONType::STRING) || type == uint8_t( milvus::index::JSONType::DOUBLE) || type == uint8_t( milvus::index::JSONType::INT64)) { UnaryJSONTypeCompare( milvus::query::Match(x, val, op_type)); } else { UnaryRangeJSONIndexCompare( milvus::query::Match( ExprValueType(x.value()), val, op_type)); } } } case proto::plan::Match: if constexpr (std::is_same_v) { return false; } else { PatternMatchTranslator translator; auto regex_pattern = translator(val); RegexMatcher matcher(regex_pattern); if (!arrayIndex.empty()) { UnaryRangeJSONIndexCompareWithArrayIndex( matcher(ExprValueType(x.value()))); } else { UnaryRangeJSONIndexCompare( matcher(ExprValueType(x.value()))); } } default: return false; } } }; bool is_growing = segment_->type() == SegmentType::Growing; bool is_strong_consistency = consistency_level_ == 0; cached_index_chunk_res_ = std::make_shared( std::move(index->FilterByPath(pointer, active_count_, is_growing, is_strong_consistency, filter_func))); } TargetBitmap result; result.append( *cached_index_chunk_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(EvalCtx& context) { if (expr_->op_type_ == proto::plan::OpType::TextMatch || expr_->op_type_ == proto::plan::OpType::PhraseMatch) { if (has_offset_input_) { PanicInfo( OpTypeInvalid, fmt::format("match query does not support iterative filter")); } return ExecTextMatch(); } if (CanUseIndex() && !has_offset_input_) { return ExecRangeVisitorImplForIndex(); } else { return ExecRangeVisitorImplForData(context); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForIndex() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } if (auto res = PreCheckOverflow()) { return res; } auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto op_type = expr_->op_type_; auto execute_sub_batch = [op_type](Index* index_ptr, IndexInnerType val) { TargetBitmap res; switch (op_type) { case proto::plan::GreaterThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::GreaterEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Equal: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::NotEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::PrefixMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::PostfixMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::InnerMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Match: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } return res; }; IndexInnerType val = value_arg_.GetValue(); auto res = ProcessIndexChunks(execute_sub_batch, val); AssertInfo(res->size() == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", res->size(), real_batch_size); return res; } template ColumnVectorPtr PhyUnaryRangeFilterExpr::PreCheckOverflow(OffsetVector* input) { if constexpr (std::is_integral_v && !std::is_same_v) { auto val = GetValueFromProto(expr_->val_); if (milvus::query::out_of_range(val)) { int64_t batch_size; if (input != nullptr) { batch_size = input->size(); } else { batch_size = overflow_check_pos_ + batch_size_ >= active_count_ ? active_count_ - overflow_check_pos_ : batch_size_; overflow_check_pos_ += batch_size; } auto valid = (input != nullptr) ? ProcessChunksForValidByOffsets(is_index_mode_, *input) : ProcessChunksForValid(is_index_mode_); auto res_vec = std::make_shared( TargetBitmap(batch_size), std::move(valid)); TargetBitmapView res(res_vec->GetRawData(), batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), batch_size); switch (expr_->op_type_) { case proto::plan::GreaterThan: case proto::plan::GreaterEqual: { if (milvus::query::lt_lb(val)) { res.set(); res &= valid_res; return res_vec; } return res_vec; } case proto::plan::LessThan: case proto::plan::LessEqual: { if (milvus::query::gt_ub(val)) { res.set(); res &= valid_res; return res_vec; } return res_vec; } case proto::plan::Equal: { res.reset(); return res_vec; } case proto::plan::NotEqual: { res.set(); res &= valid_res; return res_vec; } default: { PanicInfo(OpTypeInvalid, "unsupported range node {}", expr_->op_type_); } } } } return nullptr; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { typedef std:: conditional_t, std::string, T> IndexInnerType; auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); if (auto res = PreCheckOverflow(input)) { return res; } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } IndexInnerType val = GetValueFromProto(expr_->val_); auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); auto expr_type = expr_->op_type_; size_t processed_cursor = 0; auto execute_sub_batch = [ expr_type, &processed_cursor, & bitmap_input ]( const T* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::GreaterEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessThan: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Equal: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::NotEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PrefixMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PostfixMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::InnerMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Match: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } default: PanicInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", expr_type)); } // there is a batch operation in BinaryRangeElementFunc, // so not divide data again for the reason that it may reduce performance if the null distribution is scattered // but to mask res with valid_data after the batch operation. if (valid_data != nullptr) { bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; i++) { if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (!valid_data[offset]) { res[i] = valid_res[i] = false; } } } processed_cursor += size; }; auto skip_index_func = [expr_type, val](const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) { return skip_index.CanSkipUnaryRange( field_id, chunk_id, expr_type, val); }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets( execute_sub_batch, skip_index_func, input, res, valid_res, val); } else { processed_size = ProcessDataChunks( execute_sub_batch, skip_index_func, res, valid_res, val); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}, related params[active_count:{}, " "current_data_chunk:{}, num_data_chunk:{}, current_data_pos:{}]", processed_size, real_batch_size, active_count_, current_data_chunk_, num_data_chunk_, current_data_chunk_pos_); return res_vec; } template bool PhyUnaryRangeFilterExpr::CanUseIndex() { use_index_ = is_index_mode_ && SegmentExpr::CanUseIndex(expr_->op_type_); return use_index_; } bool PhyUnaryRangeFilterExpr::CanUseIndexForJson(DataType val_type) { auto has_index = segment_->HasIndex(field_id_, milvus::Json::pointer(expr_->column_.nested_path_), val_type); switch (val_type) { case DataType::STRING: use_index_ = has_index && expr_->op_type_ != proto::plan::OpType::Match && expr_->op_type_ != proto::plan::OpType::PostfixMatch && expr_->op_type_ != proto::plan::OpType::InnerMatch; break; default: use_index_ = has_index; } return use_index_; } VectorPtr PhyUnaryRangeFilterExpr::ExecTextMatch() { using Index = index::TextMatchIndex; if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } auto query = value_arg_.GetValue(); int64_t slop = 0; if (expr_->op_type_ == proto::plan::PhraseMatch) { // It should be larger than 0 in normal cases. Check it incase of receiving old version proto. if (expr_->extra_values_.size() > 0) { slop = GetValueFromProto(expr_->extra_values_[0]); } if (slop < 0 || slop > std::numeric_limits::max()) { throw SegcoreError( ErrorCode::InvalidParameter, fmt::format( "Slop {} is invalid in phrase match query. Should be " "within [0, UINT32_MAX].", slop)); } } auto op_type = expr_->op_type_; // Process-level LRU cache lookup by (segment_id, expr signature) if (cached_match_res_ == nullptr && exec::ExprResCacheManager::IsEnabled() && segment_->type() == SegmentType::Sealed) { exec::ExprResCacheManager::Key key{segment_->get_segment_id(), this->GetExprSignature()}; exec::ExprResCacheManager::Value v; if (exec::ExprResCacheManager::Instance().Get(key, v)) { cached_match_res_ = v.result; cached_index_chunk_valid_res_ = v.valid_result; AssertInfo(cached_match_res_->size() == active_count_, "internal error: expr res cache size {} not equal " "expect active count {}", cached_match_res_->size(), active_count_); } } auto func = [op_type, slop](Index* index, const std::string& query) -> TargetBitmap { if (op_type == proto::plan::OpType::TextMatch) { return index->MatchQuery(query); } else if (op_type == proto::plan::OpType::PhraseMatch) { return index->PhraseMatchQuery(query, slop); } else { PanicInfo(OpTypeInvalid, "unsupported operator type for match query: {}", op_type); } }; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (cached_match_res_ == nullptr) { auto index = segment_->GetTextIndex(field_id_); auto res = std::move(func(index, query)); auto valid_res = index->IsNotNull(); cached_match_res_ = std::make_shared(std::move(res)); cached_index_chunk_valid_res_ = std::make_shared(std::move(valid_res)); if (cached_match_res_->size() < active_count_) { // some entities are not visible in inverted index. // only happend on growing segment. TargetBitmap tail(active_count_ - cached_match_res_->size()); cached_match_res_->append(tail); cached_index_chunk_valid_res_->append(tail); } // Insert into process-level cache if (exec::ExprResCacheManager::IsEnabled() && segment_->type() == SegmentType::Sealed) { exec::ExprResCacheManager::Key key{segment_->get_segment_id(), this->ToString()}; exec::ExprResCacheManager::Value v; v.result = cached_match_res_; v.valid_result = cached_index_chunk_valid_res_; v.active_count = active_count_; exec::ExprResCacheManager::Instance().Put(key, v); } } TargetBitmap result; TargetBitmap valid_result; result.append( *cached_match_res_, current_data_global_pos_, real_batch_size); valid_result.append(*cached_index_chunk_valid_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), std::move(valid_result)); }; } // namespace exec } // namespace milvus