// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "UnaryExpr.h" #include #include #include "common/EasyAssert.h" #include "common/Json.h" #include "common/Types.h" #include "exec/expression/ExprCache.h" #include "common/type_c.h" #include "log/Log.h" #include "monitor/Monitor.h" #include "common/ScopedTimer.h" namespace milvus { namespace exec { template bool PhyUnaryRangeFilterExpr::CanUseIndexForArray() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { auto index_ptr = dynamic_cast(pinned_index_[i].get()); if (index_ptr->GetIndexType() == milvus::index::ScalarIndexType::HYBRID || index_ptr->GetIndexType() == milvus::index::ScalarIndexType::BITMAP) { return false; } } return true; } template <> bool PhyUnaryRangeFilterExpr::CanUseIndexForArray() { bool res; if (!SegmentExpr::CanUseIndex()) { use_index_ = res = false; return res; } switch (expr_->column_.element_type_) { case DataType::BOOL: res = CanUseIndexForArray(); break; case DataType::INT8: res = CanUseIndexForArray(); break; case DataType::INT16: res = CanUseIndexForArray(); break; case DataType::INT32: res = CanUseIndexForArray(); break; case DataType::INT64: res = CanUseIndexForArray(); break; case DataType::FLOAT: case DataType::DOUBLE: // not accurate on floating point number, rollback to bruteforce. res = false; break; case DataType::VARCHAR: case DataType::STRING: res = CanUseIndexForArray(); break; default: ThrowInfo(DataTypeInvalid, "unsupported element type when execute array " "equal for index: {}", expr_->column_.element_type_); } use_index_ = res; return res; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex(EvalCtx& context) { return ExecRangeVisitorImplArray(context); } template <> VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex( EvalCtx& context) { switch (expr_->op_type_) { case proto::plan::Equal: case proto::plan::NotEqual: { switch (expr_->column_.element_type_) { case DataType::BOOL: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT8: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT16: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT32: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::INT64: { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } case DataType::FLOAT: case DataType::DOUBLE: { // not accurate on floating point number, rollback to bruteforce. return ExecRangeVisitorImplArray( context); } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing) { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } else { return ExecArrayEqualForIndex( context, expr_->op_type_ == proto::plan::NotEqual); } } default: ThrowInfo(DataTypeInvalid, "unsupported element type when execute array " "equal for index: {}", expr_->column_.element_type_); } } default: return ExecRangeVisitorImplArray(context); } } void PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { tracer::AutoSpan span( "PhyUnaryRangeFilterExpr::Eval", tracer::GetRootSpan(), true); span.GetSpan()->SetAttribute("data_type", static_cast(expr_->column_.data_type_)); span.GetSpan()->SetAttribute("op_type", static_cast(expr_->op_type_)); auto input = context.get_offset_input(); SetHasOffsetInput((input != nullptr)); auto data_type = expr_->column_.data_type_; if (expr_->column_.element_level_) { data_type = expr_->column_.element_type_; } switch (data_type) { case DataType::BOOL: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT8: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT16: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT32: { result = ExecRangeVisitorImpl(context); break; } case DataType::INT64: { result = ExecRangeVisitorImpl(context); break; } case DataType::TIMESTAMPTZ: { result = ExecRangeVisitorImpl(context); break; } case DataType::FLOAT: { result = ExecRangeVisitorImpl(context); break; } case DataType::DOUBLE: { result = ExecRangeVisitorImpl(context); break; } case DataType::VARCHAR: { if (segment_->type() == SegmentType::Growing && !storage::MmapManager::GetInstance() .GetMmapConfig() .growing_enable_mmap) { result = ExecRangeVisitorImpl(context); } else { result = ExecRangeVisitorImpl(context); } break; } case DataType::JSON: { auto val_type = expr_->val_.val_case(); auto val_type_inner = FromValCase(val_type); if (CanUseNgramIndex() && !has_offset_input_) { auto res = ExecNgramMatch(); // If nullopt is returned, it means the query cannot be // optimized by ngram index. Forward it to the normal path. if (res.has_value()) { result = res.value(); break; } } if (CanUseIndexForJson(val_type_inner) && !has_offset_input_) { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kInt64Val: if (expr_->val_.has_int64_val()) { proto::plan::GenericValue double_val; double_val.set_float_val( static_cast(expr_->val_.int64_val())); value_arg_.SetValue(double_val); arg_inited_ = true; } result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplForIndex(); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplForIndex(); break; default: ThrowInfo( DataTypeInvalid, "unknown data type: {}", val_type); } } else { switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kStringVal: result = ExecRangeVisitorImplJson(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: result = ExecRangeVisitorImplJson( context); break; default: ThrowInfo( DataTypeInvalid, "unknown data type: {}", val_type); } } break; } case DataType::ARRAY: { auto val_type = expr_->val_.val_case(); switch (val_type) { case proto::plan::GenericValue::ValCase::kBoolVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kInt64Val: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kFloatVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kStringVal: SetNotUseIndex(); result = ExecRangeVisitorImplArray(context); break; case proto::plan::GenericValue::ValCase::kArrayVal: if (!has_offset_input_ && CanUseIndexForArray()) { result = ExecRangeVisitorImplArrayForIndex< proto::plan::Array>(context); } else { result = ExecRangeVisitorImplArray( context); } break; default: ThrowInfo( DataTypeInvalid, "unknown data type: {}", val_type); } break; } default: ThrowInfo(DataTypeInvalid, "unsupported data type: {}", expr_->column_.data_type_); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } ValueType val = value_arg_.GetValue(); auto op_type = expr_->op_type_; int index = -1; if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } int processed_cursor = 0; auto execute_sub_batch = [ op_type, &processed_cursor, & bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, ValueType val, int index) { switch (op_type) { case proto::plan::GreaterThan: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::GreaterEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessThan: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Equal: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::NotEqual: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PrefixMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Match: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PostfixMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::InnerMatch: { UnaryElementFuncForArray func; func(data, valid_data, size, val, index, res, valid_res, bitmap_input, processed_cursor, offsets); break; } default: ThrowInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, std::nullptr_t{}, input, res, valid_res, val, index); } else { processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, valid_res, val, index); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } template VectorPtr PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context, bool reverse) { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } // get all elements. auto val = GetValueFromProto(expr_->val_); if (val.array_size() == 0) { // rollback to bruteforce. no candidates will be filtered out via index. return ExecRangeVisitorImplArray(context); } // cache the result to suit the framework. auto batch_res = ProcessIndexChunks([this, &val, reverse]( Index* _) { boost::container::vector elems; for (auto const& element : val.array()) { auto e = GetValueFromProto(element); if (std::find(elems.begin(), elems.end(), e) == elems.end()) { elems.push_back(e); } } // filtering by index, get candidates. std::function is_same; if (segment_->is_chunked()) { is_same = [this, reverse](milvus::proto::plan::Array& val, int64_t offset) -> bool { auto [chunk_idx, chunk_offset] = segment_->get_chunk_by_offset(field_id_, offset); auto pw = segment_->template chunk_view( op_ctx_, field_id_, chunk_idx); auto chunk = pw.get(); return chunk.first[chunk_offset].is_same_array(val) ^ reverse; }; } else { auto size_per_chunk = segment_->size_per_chunk(); is_same = [this, size_per_chunk, reverse]( milvus::proto::plan::Array& val, int64_t offset) -> bool { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; auto pw = segment_->template chunk_data( op_ctx_, field_id_, chunk_idx); auto chunk = pw.get(); auto array_view = chunk.data() + chunk_offset; return array_view->is_same_array(val) ^ reverse; }; } // collect all candidates. std::unordered_set candidates; std::unordered_set tmp_candidates; auto first_callback = [&candidates](size_t offset) -> void { candidates.insert(offset); }; auto callback = [&candidates, &tmp_candidates](size_t offset) -> void { if (candidates.find(offset) != candidates.end()) { tmp_candidates.insert(offset); } }; auto execute_sub_batch = [](Index* index_ptr, const IndexInnerType& val, const std::function& callback) { index_ptr->InApplyCallback(1, &val, callback); }; // run in-filter. for (size_t idx = 0; idx < elems.size(); idx++) { if (idx == 0) { ProcessIndexChunksV2( execute_sub_batch, elems[idx], first_callback); } else { ProcessIndexChunksV2( execute_sub_batch, elems[idx], callback); candidates = std::move(tmp_candidates); } // the size of candidates is small enough. if (candidates.size() * 100 < active_count_) { break; } } TargetBitmap res(active_count_); // run post-filter. The filter will only be executed once in the framework. for (const auto& candidate : candidates) { res[candidate] = is_same(val, candidate); } return res; }); AssertInfo(batch_res->size() == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", batch_res->size(), real_batch_size); // return the result. return batch_res; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { using GetType = std::conditional_t, std::string_view, ExprValueType>; auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); FieldId field_id = expr_->column_.field_id_; if (!has_offset_input_ && CanUseJsonStats(context, field_id, expr_->column_.nested_path_)) { return ExecRangeVisitorImplJsonByStats(); } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); ExprValueType val = value_arg_.GetValue(); auto op_type = expr_->op_type_; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); #define UnaryRangeJSONCompare(cmp) \ do { \ auto x = data[offset].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[offset].template at(pointer); \ res[i] = !x.error() && (cmp); \ break; \ } \ res[i] = false; \ break; \ } \ res[i] = (cmp); \ } while (false) #define UnaryRangeJSONCompareNotEqual(cmp) \ do { \ auto x = data[offset].template at(pointer); \ if (x.error()) { \ if constexpr (std::is_same_v) { \ auto x = data[offset].template at(pointer); \ res[i] = x.error() || (cmp); \ break; \ } \ res[i] = true; \ break; \ } \ res[i] = (cmp); \ } while (false) int processed_cursor = 0; auto execute_sub_batch = [ op_type, pointer, &processed_cursor, & bitmap_input ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, ExprValueType val) { bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() > val); } } break; } case proto::plan::GreaterEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() >= val); } } break; } case proto::plan::LessThan: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() < val); } } break; } case proto::plan::LessEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(x.value() <= val); } } break; } case proto::plan::Equal: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompare(x.value() == val); } } break; } case proto::plan::NotEqual: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { valid_res[i] = false; res[i] = true; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { auto doc = data[i].doc(); auto array = doc.at_pointer(pointer).get_array(); if (array.error()) { res[i] = false; continue; } res[i] = !CompareTwoJsonArray(array, val); } else { UnaryRangeJSONCompareNotEqual(x.value() != val); } } break; } case proto::plan::InnerMatch: case proto::plan::PostfixMatch: case proto::plan::PrefixMatch: { for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare(milvus::query::Match( ExprValueType(x.value()), val, op_type)); } } break; } case proto::plan::Match: { PatternMatchTranslator translator; auto regex_pattern = translator(val); RegexMatcher matcher(regex_pattern); for (size_t i = 0; i < size; ++i) { auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (valid_data != nullptr && !valid_data[offset]) { res[i] = valid_res[i] = false; continue; } if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } if constexpr (std::is_same_v) { res[i] = false; } else { UnaryRangeJSONCompare( matcher(ExprValueType(x.value()))); } } break; } default: ThrowInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } processed_cursor += size; }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets( execute_sub_batch, std::nullptr_t{}, input, res, valid_res, val); } else { processed_size = ProcessDataChunks( execute_sub_batch, std::nullptr_t{}, res, valid_res, val); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", processed_size, real_batch_size); return res_vec; } std::pair PhyUnaryRangeFilterExpr::SplitAtFirstSlashDigit(std::string input) { boost::regex rgx("/\\d+"); boost::smatch match; if (boost::regex_search(input, match, rgx)) { std::string firstPart = input.substr(0, match.position()); std::string secondPart = input.substr(match.position()); return {firstPart, secondPart}; } else { return {input, ""}; } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() { using GetType = std::conditional_t, std::string_view, ExprValueType>; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (cached_index_chunk_id_ != 0 && segment_->type() == SegmentType::Sealed) { auto pointerpath = milvus::Json::pointer(expr_->column_.nested_path_); auto pointerpair = SplitAtFirstSlashDigit(pointerpath); std::string pointer = pointerpair.first; size_t array_index = pointerpair.second.empty() ? INVALID_ARRAY_INDEX : std::stoi(pointerpair.second); ExprValueType val = GetValueFromProto(expr_->val_); // for NotEqual: compute Equal and flip the result // this avoids handling NULL values differently in multiple places auto op_type = (expr_->op_type_ == proto::plan::OpType::NotEqual) ? proto::plan::OpType::Equal : expr_->op_type_; auto segment = static_cast(segment_); auto field_id = expr_->column_.field_id_; auto index = segment->GetJsonStats(op_ctx_, field_id); Assert(index.get() != nullptr); cached_index_chunk_res_ = (op_type == proto::plan::OpType::NotEqual) ? std::make_shared(active_count_, true) : std::make_shared(active_count_); cached_index_chunk_valid_res_ = std::make_shared(active_count_, true); TargetBitmapView res_view(*cached_index_chunk_res_); TargetBitmapView valid_res_view(*cached_index_chunk_valid_res_); // process shredding data auto try_execute = [&](milvus::index::JSONType json_type, TargetBitmapView& res_view, TargetBitmapView& valid_res_view, auto GetType, auto ValType) { auto target_field = index->GetShreddingField(pointer, json_type); if (!target_field.empty()) { using ColType = decltype(GetType); using ValType = decltype(ValType); ShreddingExecutor executor( op_type, pointer, val); index->ExecutorForShreddingData(op_ctx_, target_field, executor, nullptr, res_view, valid_res_view); LOG_DEBUG( "using shredding data's field: {} with value {}, count {} " "for segment {}", target_field, val, res_view.count(), segment_->get_segment_id()); } }; { milvus::ScopedTimer timer( "unary_json_stats_shredding_data", [](double ms) { milvus::monitor::internal_json_stats_latency_shredding .Observe(ms); }); if constexpr (std::is_same_v) { try_execute(milvus::index::JSONType::BOOL, res_view, valid_res_view, bool{}, bool{}); } else if constexpr (std::is_same_v) { try_execute(milvus::index::JSONType::INT64, res_view, valid_res_view, int64_t{}, int64_t{}); // and double compare TargetBitmap res_double(active_count_, false); TargetBitmapView res_double_view(res_double); TargetBitmap res_double_valid(active_count_, true); TargetBitmapView valid_res_double_view(res_double_valid); try_execute(milvus::index::JSONType::DOUBLE, res_double_view, valid_res_double_view, double{}, int64_t{}); res_view.inplace_or_with_count(res_double_view, active_count_); valid_res_view.inplace_or_with_count(valid_res_double_view, active_count_); } else if constexpr (std::is_same_v) { try_execute(milvus::index::JSONType::DOUBLE, res_view, valid_res_view, double{}, double{}); // add int64 compare TargetBitmap res_int64(active_count_, false); TargetBitmapView res_int64_view(res_int64); TargetBitmap res_int64_valid(active_count_, true); TargetBitmapView valid_res_int64_view(res_int64_valid); try_execute(milvus::index::JSONType::INT64, res_int64_view, valid_res_int64_view, int64_t{}, double{}); res_view.inplace_or_with_count(res_int64_view, active_count_); valid_res_view.inplace_or_with_count(valid_res_int64_view, active_count_); } else if constexpr (std::is_same_v || std::is_same_v) { try_execute(milvus::index::JSONType::STRING, res_view, valid_res_view, GetType{}, GetType{}); } else if constexpr (std::is_same_v) { // ARRAY shredding data: stored as BSON binary in binary column auto target_field = index->GetShreddingField( pointer, milvus::index::JSONType::ARRAY); if (!target_field.empty()) { ShreddingArrayBsonExecutor executor(op_type, pointer, val); index->ExecutorForShreddingData( op_ctx_, target_field, executor, nullptr, res_view, valid_res_view); LOG_DEBUG("using shredding array field: {}, count {}", target_field, res_view.count()); } } } // process shared data auto shared_executor = [op_type, val, array_index, &res_view]( milvus::BsonView bson, uint32_t row_id, uint32_t value_offset) { if constexpr (std::is_same_v) { Assert(op_type == proto::plan::OpType::Equal || op_type == proto::plan::OpType::NotEqual); if (array_index != INVALID_ARRAY_INDEX) { auto array_value = bson.ParseAsArrayAtOffset(value_offset); if (!array_value.has_value()) { // For NotEqual: path not exists means "not equal", keep true // For Equal: path not exists means no match, set false res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); return; } auto sub_array = milvus::BsonView::GetNthElementInArray< bsoncxx::array::view>(array_value.value().data(), array_index); if (!sub_array.has_value()) { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); return; } res_view[row_id] = op_type == proto::plan::OpType::Equal ? CompareTwoJsonArray(sub_array.value(), val) : !CompareTwoJsonArray(sub_array.value(), val); } else { auto array_value = bson.ParseAsArrayAtOffset(value_offset); if (!array_value.has_value()) { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); return; } res_view[row_id] = op_type == proto::plan::OpType::Equal ? CompareTwoJsonArray(array_value.value(), val) : !CompareTwoJsonArray(array_value.value(), val); } } else { std::optional get_value; if (array_index != INVALID_ARRAY_INDEX) { auto array_value = bson.ParseAsArrayAtOffset(value_offset); if (!array_value.has_value()) { // Path not exists: NotEqual->true, others->false res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); return; } get_value = milvus::BsonView::GetNthElementInArray( array_value.value().data(), array_index); // If GetType is int and value is not found, try double if constexpr (std::is_same_v) { if (!get_value.has_value()) { auto get_value = milvus::BsonView::GetNthElementInArray( array_value.value().data(), array_index); if (get_value.has_value()) { res_view[row_id] = UnaryCompare( get_value.value(), val, op_type); } else { // Type mismatch: NotEqual->true, others->false res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); } return; } } else if constexpr (std::is_same_v) { if (!get_value.has_value()) { auto get_value = milvus::BsonView::GetNthElementInArray( array_value.value().data(), array_index); if (get_value.has_value()) { res_view[row_id] = UnaryCompare( get_value.value(), val, op_type); } else { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); } return; } } } else { get_value = bson.ParseAsValueAtOffset(value_offset); // If GetType is int and value is not found, try double if constexpr (std::is_same_v) { if (!get_value.has_value()) { auto get_value = bson.ParseAsValueAtOffset(value_offset); if (get_value.has_value()) { res_view[row_id] = UnaryCompare( get_value.value(), val, op_type); } else { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); } return; } } else if constexpr (std::is_same_v) { if (!get_value.has_value()) { auto get_value = bson.ParseAsValueAtOffset( value_offset); if (get_value.has_value()) { res_view[row_id] = UnaryCompare( get_value.value(), val, op_type); } else { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); } return; } } } if (!get_value.has_value()) { res_view[row_id] = (op_type == proto::plan::OpType::NotEqual); return; } res_view[row_id] = UnaryCompare(get_value.value(), val, op_type); } }; std::set target_types; if constexpr (std::is_same_v) { target_types.insert(milvus::index::JSONType::STRING); } else if constexpr (std::is_same_v || std::is_same_v) { target_types.insert(milvus::index::JSONType::INT64); target_types.insert(milvus::index::JSONType::DOUBLE); } else if constexpr (std::is_same_v) { target_types.insert(milvus::index::JSONType::BOOL); } { milvus::ScopedTimer timer( "unary_json_stats_shared_data", [](double ms) { milvus::monitor::internal_json_stats_latency_shared.Observe( ms); }); index->ExecuteForSharedData( op_ctx_, bson_index_, pointer, shared_executor); } // for NotEqual: flip the result if (expr_->op_type_ == proto::plan::OpType::NotEqual) { cached_index_chunk_res_->flip(); } cached_index_chunk_id_ = 0; } TargetBitmap result; result.append( *cached_index_chunk_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImpl(EvalCtx& context) { if (expr_->op_type_ == proto::plan::OpType::TextMatch || expr_->op_type_ == proto::plan::OpType::PhraseMatch) { if (has_offset_input_) { ThrowInfo( OpTypeInvalid, fmt::format("match query does not support iterative filter")); } return ExecTextMatch(); } else if (CanUseNgramIndex()) { auto res = ExecNgramMatch(); // If nullopt is returned, it means the query cannot be // optimized by ngram index. Forward it to the normal path. if (res.has_value()) { return res.value(); } } if (!has_offset_input_ && is_pk_field_ && IsCompareOp(expr_->op_type_)) { if (pk_type_ == DataType::VARCHAR) { return ExecRangeVisitorImplForPk(context); } else { return ExecRangeVisitorImplForPk(context); } } if (CanUseIndex() && !has_offset_input_) { return ExecRangeVisitorImplForIndex(); } else { return ExecRangeVisitorImplForData(context); } } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForPk(EvalCtx& context) { typedef std:: conditional_t, std::string, T> IndexInnerType; if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } if (auto res = PreCheckOverflow()) { return res; } auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (cached_index_chunk_id_ != 0) { cached_index_chunk_id_ = 0; cached_index_chunk_res_ = std::make_shared(active_count_); auto cache_view = cached_index_chunk_res_->view(); auto op_type = expr_->op_type_; PkType pk = value_arg_.GetValue(); if (op_type == proto::plan::NotEqual) { segment_->pk_range(op_ctx_, proto::plan::Equal, pk, cache_view); cache_view.flip(); } else { segment_->pk_range(op_ctx_, op_type, pk, cache_view); } } TargetBitmap result; result.append( *cached_index_chunk_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForIndex() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } if (auto res = PreCheckOverflow()) { return res; } auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } auto op_type = expr_->op_type_; auto execute_sub_batch = [op_type](Index* index_ptr, IndexInnerType val) { TargetBitmap res; switch (op_type) { case proto::plan::GreaterThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::GreaterEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessThan: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::LessEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Equal: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::NotEqual: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::PrefixMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::PostfixMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::InnerMatch: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } case proto::plan::Match: { UnaryIndexFunc func; res = std::move(func(index_ptr, val)); break; } default: ThrowInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", op_type)); } return res; }; IndexInnerType val = value_arg_.GetValue(); auto res = ProcessIndexChunks(execute_sub_batch, val); AssertInfo(res->size() == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}", res->size(), real_batch_size); return res; } template ColumnVectorPtr PhyUnaryRangeFilterExpr::PreCheckOverflow(OffsetVector* input) { if constexpr (std::is_integral_v && !std::is_same_v) { auto val = GetValueFromProto(expr_->val_); if (milvus::query::out_of_range(val)) { int64_t batch_size; if (input != nullptr) { batch_size = input->size(); } else { batch_size = overflow_check_pos_ + batch_size_ >= active_count_ ? active_count_ - overflow_check_pos_ : batch_size_; overflow_check_pos_ += batch_size; } auto valid = (input != nullptr) ? ProcessChunksForValidByOffsets( SegmentExpr::CanUseIndex(), *input) : ProcessChunksForValid(SegmentExpr::CanUseIndex()); auto res_vec = std::make_shared( TargetBitmap(batch_size), std::move(valid)); TargetBitmapView res(res_vec->GetRawData(), batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), batch_size); switch (expr_->op_type_) { case proto::plan::GreaterThan: case proto::plan::GreaterEqual: { if (milvus::query::lt_lb(val)) { res.set(); res &= valid_res; return res_vec; } return res_vec; } case proto::plan::LessThan: case proto::plan::LessEqual: { if (milvus::query::gt_ub(val)) { res.set(); res &= valid_res; return res_vec; } return res_vec; } case proto::plan::Equal: { res.reset(); return res_vec; } case proto::plan::NotEqual: { res.set(); res &= valid_res; return res_vec; } default: { ThrowInfo(OpTypeInvalid, "unsupported range node {}", expr_->op_type_); } } } } return nullptr; } template VectorPtr PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { typedef std:: conditional_t, std::string, T> IndexInnerType; auto* input = context.get_offset_input(); const auto& bitmap_input = context.get_bitmap_input(); if (auto res = PreCheckOverflow(input)) { return res; } auto real_batch_size = has_offset_input_ ? input->size() : GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } IndexInnerType val = GetValueFromProto(expr_->val_); auto res_vec = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); TargetBitmapView res(res_vec->GetRawData(), real_batch_size); TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); auto expr_type = expr_->op_type_; size_t processed_cursor = 0; auto execute_sub_batch = [ expr_type, &processed_cursor, & bitmap_input ]( const T* data, const bool* valid_data, const int32_t* offsets, const int size, TargetBitmapView res, TargetBitmapView valid_res, IndexInnerType val) { // If data is nullptr, this chunk was skipped by SkipIndex. // We only need to update processed_cursor for bitmap_input indexing. if (data == nullptr) { processed_cursor += size; return; } switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::GreaterEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessThan: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::LessEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Equal: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::NotEqual: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PrefixMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::PostfixMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::InnerMatch: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } case proto::plan::Match: { UnaryElementFunc func; func(data, size, val, res, bitmap_input, processed_cursor, offsets); break; } default: ThrowInfo( OpTypeInvalid, fmt::format("unsupported operator type for unary expr: {}", expr_type)); } // there is a batch operation in BinaryRangeElementFunc, // so not divide data again for the reason that it may reduce performance if the null distribution is scattered // but to mask res with valid_data after the batch operation. if (valid_data != nullptr) { bool has_bitmap_input = !bitmap_input.empty(); for (int i = 0; i < size; i++) { if (has_bitmap_input && !bitmap_input[i + processed_cursor]) { continue; } auto offset = i; if constexpr (filter_type == FilterType::random) { offset = (offsets) ? offsets[i] : i; } if (!valid_data[offset]) { res[i] = valid_res[i] = false; } } } processed_cursor += size; }; auto skip_index_func = [expr_type, val](const SkipIndex& skip_index, FieldId field_id, int64_t chunk_id) { return skip_index.CanSkipUnaryRange( field_id, chunk_id, expr_type, val); }; int64_t processed_size; if (has_offset_input_) { if (expr_->column_.element_level_) { // For element-level filtering processed_size = ProcessElementLevelByOffsets( execute_sub_batch, skip_index_func, input, res, valid_res, val); } else { processed_size = ProcessDataByOffsets( execute_sub_batch, skip_index_func, input, res, valid_res, val); } } else { AssertInfo(!expr_->column_.element_level_, "Element-level filtering is not supported without offsets"); processed_size = ProcessDataChunks( execute_sub_batch, skip_index_func, res, valid_res, val); } AssertInfo(processed_size == real_batch_size, "internal error: expr processed rows {} not equal " "expect batch size {}, related params[active_count:{}, " "current_data_chunk:{}, num_data_chunk:{}, current_data_pos:{}]", processed_size, real_batch_size, active_count_, current_data_chunk_, num_data_chunk_, current_data_chunk_pos_); return res_vec; } template bool PhyUnaryRangeFilterExpr::CanUseIndex() { use_index_ = SegmentExpr::CanUseIndex() && SegmentExpr::CanUseIndexForOp(expr_->op_type_); return use_index_; } bool PhyUnaryRangeFilterExpr::CanUseIndexForJson(DataType val_type) { if (!SegmentExpr::CanUseIndex()) { use_index_ = false; return false; } bool has_index = pinned_index_.size() > 0; switch (val_type) { case DataType::STRING: case DataType::VARCHAR: use_index_ = has_index && expr_->op_type_ != proto::plan::OpType::Match && expr_->op_type_ != proto::plan::OpType::PostfixMatch && expr_->op_type_ != proto::plan::OpType::InnerMatch; break; default: use_index_ = has_index; } return use_index_; } VectorPtr PhyUnaryRangeFilterExpr::ExecTextMatch() { using Index = index::TextMatchIndex; if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } auto query = value_arg_.GetValue(); int64_t slop = 0; if (expr_->op_type_ == proto::plan::PhraseMatch) { // It should be larger than 0 in normal cases. Check it incase of receiving old version proto. if (expr_->extra_values_.size() > 0) { slop = GetValueFromProto(expr_->extra_values_[0]); } if (slop < 0 || slop > std::numeric_limits::max()) { throw SegcoreError( ErrorCode::InvalidParameter, fmt::format( "Slop {} is invalid in phrase match query. Should be " "within [0, UINT32_MAX].", slop)); } } auto op_type = expr_->op_type_; // Process-level LRU cache lookup by (segment_id, expr signature) if (cached_match_res_ == nullptr && exec::ExprResCacheManager::IsEnabled() && segment_->type() == SegmentType::Sealed) { exec::ExprResCacheManager::Key key{segment_->get_segment_id(), this->ToString()}; exec::ExprResCacheManager::Value v; if (exec::ExprResCacheManager::Instance().Get(key, v)) { cached_match_res_ = v.result; cached_index_chunk_valid_res_ = v.valid_result; AssertInfo(cached_match_res_->size() == active_count_, "internal error: expr res cache size {} not equal " "expect active count {}", cached_match_res_->size(), active_count_); } } uint32_t min_should_match = 1; // default value if (op_type == proto::plan::OpType::TextMatch && expr_->extra_values_.size() > 0) { // min_should_match is stored in the first extra value min_should_match = static_cast( GetValueFromProto(expr_->extra_values_[0])); } auto func = [op_type, slop, min_should_match]( Index* index, const std::string& query) -> TargetBitmap { if (op_type == proto::plan::OpType::TextMatch) { return index->MatchQuery(query, min_should_match); } else if (op_type == proto::plan::OpType::PhraseMatch) { return index->PhraseMatchQuery(query, slop); } else { ThrowInfo(OpTypeInvalid, "unsupported operator type for match query: {}", op_type); } }; auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; } if (cached_match_res_ == nullptr) { auto pw = segment_->GetTextIndex(op_ctx_, field_id_); auto index = pw.get(); auto res = std::move(func(index, query)); auto valid_res = index->IsNotNull(); cached_match_res_ = std::make_shared(std::move(res)); cached_index_chunk_valid_res_ = std::make_shared(std::move(valid_res)); if (cached_match_res_->size() < active_count_) { // some entities are not visible in inverted index. // only happend on growing segment. TargetBitmap tail(active_count_ - cached_match_res_->size()); cached_match_res_->append(tail); cached_index_chunk_valid_res_->append(tail); } // Insert into process-level cache if (exec::ExprResCacheManager::IsEnabled() && segment_->type() == SegmentType::Sealed) { exec::ExprResCacheManager::Key key{segment_->get_segment_id(), this->ToString()}; exec::ExprResCacheManager::Value v; v.result = cached_match_res_; v.valid_result = cached_index_chunk_valid_res_; v.active_count = active_count_; exec::ExprResCacheManager::Instance().Put(key, v); } } TargetBitmap result; TargetBitmap valid_result; result.append( *cached_match_res_, current_data_global_pos_, real_batch_size); valid_result.append(*cached_index_chunk_valid_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), std::move(valid_result)); }; bool PhyUnaryRangeFilterExpr::CanUseNgramIndex() const { return pinned_ngram_index_.get() != nullptr && !has_offset_input_; } std::optional PhyUnaryRangeFilterExpr::ExecNgramMatch() { if (!arg_inited_) { value_arg_.SetValue(expr_->val_); arg_inited_ = true; } auto literal = value_arg_.GetValue(); auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return std::nullopt; } if (cached_ngram_match_res_ == nullptr) { auto index = pinned_ngram_index_.get(); AssertInfo(index != nullptr, "ngram index should not be null, field_id: {}", field_id_.get()); auto res_opt = index->ExecuteQuery(literal, expr_->op_type_, this); if (!res_opt.has_value()) { return std::nullopt; } cached_ngram_match_res_ = std::make_shared(std::move(res_opt.value())); cached_index_chunk_valid_res_ = std::make_shared(std::move(index->IsNotNull())); } TargetBitmap result; TargetBitmap valid_result; result.append( *cached_ngram_match_res_, current_data_global_pos_, real_batch_size); valid_result.append(*cached_index_chunk_valid_res_, current_data_global_pos_, real_batch_size); MoveCursor(); return std::make_shared(std::move(result), std::move(valid_result)); } } // namespace exec } // namespace milvus