// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include #include "common/FieldDataInterface.h" #include "common/Json.h" #include "common/Types.h" #include "exec/expression/EvalCtx.h" #include "exec/expression/VectorFunction.h" #include "exec/expression/Utils.h" #include "exec/QueryContext.h" #include "expr/ITypeExpr.h" #include "log/Log.h" #include "query/PlanProto.h" #include "segcore/SegmentSealedImpl.h" #include "segcore/SegmentInterface.h" #include "segcore/SegmentGrowingImpl.h" namespace milvus { namespace exec { enum class FilterType { sequential = 0, random = 1 }; class Expr { public: Expr(DataType type, const std::vector>&& inputs, const std::string& name) : type_(type), inputs_(std::move(inputs)), name_(name), vector_func_(nullptr) { } Expr(DataType type, const std::vector>&& inputs, std::shared_ptr vec_func, const std::string& name) : type_(type), inputs_(std::move(inputs)), name_(name), vector_func_(vec_func) { } virtual ~Expr() = default; const DataType& type() const { return type_; } std::string name() { return name_; } virtual void Eval(EvalCtx& context, VectorPtr& result) { } // Only move cursor to next batch // but not do real eval for optimization virtual void MoveCursor() { } void SetHasOffsetInput(bool has_offset_input) { has_offset_input_ = has_offset_input; } virtual bool SupportOffsetInput() { return true; } virtual std::string ToString() const { PanicInfo(ErrorCode::NotImplemented, "not implemented"); } virtual bool IsSource() const { return false; } virtual std::optional GetColumnInfo() const { PanicInfo(ErrorCode::NotImplemented, "not implemented"); } std::vector>& GetInputsRef() { return inputs_; } protected: DataType type_; std::vector> inputs_; std::string name_; // NOTE: unused std::shared_ptr vector_func_; // whether we have offset input and do expr filtering on these data // default is false which means we will do expr filtering on the total segment data bool has_offset_input_ = false; }; using ExprPtr = std::shared_ptr; using SkipFunc = bool (*)(const milvus::SkipIndex&, FieldId, int); /* * The expr has only one column. */ class SegmentExpr : public Expr { public: SegmentExpr(const std::vector&& input, const std::string& name, const segcore::SegmentInternalInterface* segment, const FieldId field_id, const std::vector nested_path, const DataType value_type, int64_t active_count, int64_t batch_size, int32_t consistency_level, bool allow_any_json_cast_type = false, bool is_json_contains = false) : Expr(DataType::BOOL, std::move(input), name), segment_(segment), field_id_(field_id), nested_path_(nested_path), value_type_(value_type), allow_any_json_cast_type_(allow_any_json_cast_type), active_count_(active_count), batch_size_(batch_size), consistency_level_(consistency_level), is_json_contains_(is_json_contains) { size_per_chunk_ = segment_->size_per_chunk(); AssertInfo( batch_size_ > 0, fmt::format("expr batch size should greater than zero, but now: {}", batch_size_)); InitSegmentExpr(); } void InitSegmentExpr() { auto& schema = segment_->get_schema(); auto& field_meta = schema[field_id_]; field_type_ = field_meta.get_data_type(); if (schema.get_primary_field_id().has_value() && schema.get_primary_field_id().value() == field_id_ && IsPrimaryKeyDataType(field_meta.get_data_type())) { is_pk_field_ = true; pk_type_ = field_meta.get_data_type(); } if (field_meta.get_data_type() == DataType::JSON) { auto pointer = milvus::Json::pointer(nested_path_); if (is_index_mode_ = segment_->HasIndex(field_id_, pointer, value_type_, allow_any_json_cast_type_, is_json_contains_)) { num_index_chunk_ = 1; } } else if (field_meta.get_data_type() == DataType::GEOMETRY) { is_index_mode_ = segment_->HasIndex(field_id_); if (is_index_mode_) { num_index_chunk_ = 1; } } else { is_index_mode_ = segment_->HasIndex(field_id_); if (is_index_mode_) { num_index_chunk_ = segment_->num_chunk_index(field_id_); } } // if index not include raw data, also need load data if (segment_->HasFieldData(field_id_)) { if (segment_->is_chunked()) { num_data_chunk_ = segment_->num_chunk_data(field_id_); } else { num_data_chunk_ = upper_div(active_count_, size_per_chunk_); } } } virtual bool IsSource() const override { return true; } void MoveCursorForDataMultipleChunk() { int64_t processed_size = 0; for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; // if segment is chunked, type won't be growing int64_t size = segment_->chunk_size(field_id_, i) - data_pos; size = std::min(size, batch_size_ - processed_size); processed_size += size; if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; current_data_global_pos_ = current_data_global_pos_ + processed_size; break; } // } } } void MoveCursorForDataSingleChunk() { if (segment_->type() == SegmentType::Sealed) { auto size = std::min(active_count_ - current_data_chunk_pos_, batch_size_); current_data_chunk_pos_ += size; current_data_global_pos_ += size; } else { int64_t processed_size = 0; for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; auto size = (i == (num_data_chunk_ - 1) && active_count_ % size_per_chunk_ != 0) ? active_count_ % size_per_chunk_ - data_pos : size_per_chunk_ - data_pos; size = std::min(size, batch_size_ - processed_size); processed_size += size; if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; current_data_global_pos_ = current_data_global_pos_ + processed_size; break; } } } } void MoveCursorForData() { if (segment_->is_chunked()) { MoveCursorForDataMultipleChunk(); } else { MoveCursorForDataSingleChunk(); } } void MoveCursorForIndex() { AssertInfo(segment_->type() == SegmentType::Sealed, "index mode only for sealed segment"); auto size = std::min(active_count_ - current_index_chunk_pos_, batch_size_); current_index_chunk_pos_ += size; } void MoveCursor() override { // when we specify input, do not maintain states if (!has_offset_input_) { if (is_index_mode_) { MoveCursorForIndex(); if (segment_->HasFieldData(field_id_)) { MoveCursorForData(); } } else { MoveCursorForData(); } } } void ApplyValidData(const bool* valid_data, TargetBitmapView res, TargetBitmapView valid_res, const int size) { if (valid_data != nullptr) { for (int i = 0; i < size; i++) { if (!valid_data[i]) { res[i] = valid_res[i] = false; } } } } int64_t GetNextBatchSize() { auto use_sealed_index = is_index_mode_ && use_index_ && segment_->type() == SegmentType::Sealed; auto current_chunk = use_sealed_index ? current_index_chunk_ : current_data_chunk_; auto current_chunk_pos = use_sealed_index ? current_index_chunk_pos_ : current_data_chunk_pos_; auto current_rows = 0; if (segment_->is_chunked()) { current_rows = use_sealed_index ? current_chunk_pos : segment_->num_rows_until_chunk( field_id_, current_chunk) + current_chunk_pos; } else { current_rows = current_chunk * size_per_chunk_ + current_chunk_pos; } return current_rows + batch_size_ >= active_count_ ? active_count_ - current_rows : batch_size_; } // used for processing raw data expr for sealed segments. // now only used for std::string_view && json // TODO: support more types template int64_t ProcessChunkForSealedSeg( FUNC func, std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { // For sealed segment, only single chunk Assert(num_data_chunk_ == 1); auto need_size = std::min(active_count_ - current_data_chunk_pos_, batch_size_); if (need_size == 0) return 0; //do not go empty-loop at the bound of the chunk auto& skip_index = segment_->GetSkipIndex(); auto views_info = segment_->get_batch_views( field_id_, 0, current_data_chunk_pos_, need_size); if (!skip_func || !skip_func(skip_index, field_id_, 0)) { // first is the raw data, second is valid_data // use valid_data to see if raw data is null if constexpr (NeedSegmentOffsets) { // For GIS functions: construct segment offsets array std::vector segment_offsets_array(need_size); for (int64_t j = 0; j < need_size; ++j) { segment_offsets_array[j] = static_cast(current_data_chunk_pos_ + j); } func(views_info.first.data(), views_info.second.data(), nullptr, segment_offsets_array.data(), need_size, res, valid_res, values...); } else { func(views_info.first.data(), views_info.second.data(), nullptr, need_size, res, valid_res, values...); } } else { ApplyValidData(views_info.second.data(), res, valid_res, need_size); } current_data_chunk_pos_ += need_size; return need_size; } // accept offsets array and process on the scalar data by offsets // stateless! Just check and set bitset as result, does not need to move cursor // used for processing raw data expr for sealed segments. // now only used for std::string_view && json // TODO: support more types template int64_t ProcessDataByOffsetsForSealedSeg( FUNC func, std::function skip_func, OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { // For non_chunked sealed segment, only single chunk Assert(num_data_chunk_ == 1); auto& skip_index = segment_->GetSkipIndex(); auto [data_vec, valid_data] = segment_->get_views_by_offsets(field_id_, 0, *input); if (!skip_func || !skip_func(skip_index, field_id_, 0)) { func(data_vec.data(), valid_data.data(), nullptr, input->size(), res, valid_res, values...); } else { ApplyValidData(valid_data.data(), res, valid_res, input->size()); } return input->size(); } template VectorPtr ProcessIndexChunksByOffsets(FUNC func, OffsetVector* input, ValTypes... values) { AssertInfo(num_index_chunk_ == 1, "scalar index chunk num must be 1"); typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; TargetBitmap valid_res(input->size()); const Index& index = segment_->chunk_scalar_index(field_id_, 0); auto* index_ptr = const_cast(&index); auto valid_result = index_ptr->IsNotNull(); for (auto i = 0; i < input->size(); ++i) { valid_res[i] = valid_result[(*input)[i]]; } auto result = std::move(func.template operator()( index_ptr, values..., input->data())); return std::make_shared(std::move(result), std::move(valid_res)); } // when we have scalar index and index contains raw data, could go with index chunk by offsets template int64_t ProcessIndexLookupByOffsets( FUNC func, std::function skip_func, OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { AssertInfo(num_index_chunk_ == 1, "scalar index chunk num must be 1"); auto& skip_index = segment_->GetSkipIndex(); typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; const Index& index = segment_->chunk_scalar_index(field_id_, 0); auto* index_ptr = const_cast(&index); auto valid_result = index_ptr->IsNotNull(); auto batch_size = input->size(); if (!skip_func || !skip_func(skip_index, field_id_, 0)) { for (auto i = 0; i < batch_size; ++i) { auto offset = (*input)[i]; auto raw = index_ptr->Reverse_Lookup(offset); if (!raw.has_value()) { res[i] = false; continue; } T raw_data = raw.value(); bool valid_data = valid_result[offset]; func.template operator()(&raw_data, &valid_data, nullptr, 1, res + i, valid_res + i, values...); } } else { for (auto i = 0; i < batch_size; ++i) { auto offset = (*input)[i]; res[i] = valid_res[i] = valid_result[offset]; } } return batch_size; } // accept offsets array and process on the scalar data by offsets // stateless! Just check and set bitset as result, does not need to move cursor template int64_t ProcessDataByOffsets( FUNC func, std::function skip_func, OffsetVector* input, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { int64_t processed_size = 0; // index reverse lookup if (is_index_mode_ && num_data_chunk_ == 0) { return ProcessIndexLookupByOffsets( func, skip_func, input, res, valid_res, values...); } auto& skip_index = segment_->GetSkipIndex(); // raw data scan // sealed segment if (segment_->type() == SegmentType::Sealed) { if (segment_->is_chunked()) { if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { for (size_t i = 0; i < input->size(); ++i) { int64_t offset = (*input)[i]; auto [chunk_id, chunk_offset] = segment_->get_chunk_by_offset(field_id_, offset); auto [data_vec, valid_data] = segment_->get_views_by_offsets( field_id_, chunk_id, {int32_t(chunk_offset)}); if (!skip_func || !skip_func(skip_index, field_id_, chunk_id)) { func.template operator()( data_vec.data(), valid_data.data(), nullptr, 1, res + processed_size, valid_res + processed_size, values...); } else { res[processed_size] = valid_res[processed_size] = (valid_data[0]); } processed_size++; } return input->size(); } for (size_t i = 0; i < input->size(); ++i) { int64_t offset = (*input)[i]; auto [chunk_id, chunk_offset] = segment_->get_chunk_by_offset(field_id_, offset); auto chunk = segment_->chunk_data(field_id_, chunk_id); const T* data = chunk.data() + chunk_offset; const bool* valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_data += chunk_offset; } if (!skip_func || !skip_func(skip_index, field_id_, chunk_id)) { func.template operator()( data, valid_data, nullptr, 1, res + processed_size, valid_res + processed_size, values...); } else { ApplyValidData(valid_data, res + processed_size, valid_res + processed_size, 1); } processed_size++; } return input->size(); } else { if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { return ProcessDataByOffsetsForSealedSeg( func, skip_func, input, res, valid_res, values...); } auto chunk = segment_->chunk_data(field_id_, 0); const T* data = chunk.data(); const bool* valid_data = chunk.valid_data(); if (!skip_func || !skip_func(skip_index, field_id_, 0)) { func.template operator()(data, valid_data, input->data(), input->size(), res, valid_res, values...); } else { ApplyValidData(valid_data, res, valid_res, input->size()); } return input->size(); } } else { // growing segment for (size_t i = 0; i < input->size(); ++i) { int64_t offset = (*input)[i]; auto chunk_id = offset / size_per_chunk_; auto chunk_offset = offset % size_per_chunk_; auto chunk = segment_->chunk_data(field_id_, chunk_id); const T* data = chunk.data() + chunk_offset; const bool* valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_data += chunk_offset; } if (!skip_func || !skip_func(skip_index, field_id_, chunk_id)) { func.template operator()( data, valid_data, nullptr, 1, res + processed_size, valid_res + processed_size, values...); } else { ApplyValidData(valid_data, res + processed_size, valid_res + processed_size, 1); } processed_size++; } } return input->size(); } // Template parameter to control whether segment offsets are needed (for GIS functions) template int64_t ProcessDataChunksForSingleChunk( FUNC func, std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { int64_t processed_size = 0; if constexpr (std::is_same_v || std::is_same_v) { if (segment_->type() == SegmentType::Sealed) { return ProcessChunkForSealedSeg( func, skip_func, res, valid_res, values...); } } for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; auto size = (i == (num_data_chunk_ - 1)) ? (segment_->type() == SegmentType::Growing ? (active_count_ % size_per_chunk_ == 0 ? size_per_chunk_ - data_pos : active_count_ % size_per_chunk_ - data_pos) : active_count_ - data_pos) : size_per_chunk_ - data_pos; size = std::min(size, batch_size_ - processed_size); if (size == 0) continue; //do not go empty-loop at the bound of the chunk auto& skip_index = segment_->GetSkipIndex(); auto chunk = segment_->chunk_data(field_id_, i); const bool* valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_data += data_pos; } if (!skip_func || !skip_func(skip_index, field_id_, i)) { const T* data = chunk.data() + data_pos; if constexpr (NeedSegmentOffsets) { // For GIS functions: construct segment offsets array std::vector segment_offsets_array(size); for (int64_t j = 0; j < size; ++j) { segment_offsets_array[j] = static_cast( size_per_chunk_ * i + data_pos + j); } func(data, valid_data, nullptr, segment_offsets_array.data(), size, res + processed_size, valid_res + processed_size, values...); } else { func(data, valid_data, nullptr, size, res + processed_size, valid_res + processed_size, values...); } } else { // Chunk is skipped by SkipIndex. // We still need to: // 1. Apply valid_data to handle nullable fields // 2. Call func with nullptr to update internal cursors // (e.g., processed_cursor for bitmap_input indexing) ApplyValidData(valid_data, res + processed_size, valid_res + processed_size, size); // Call func with nullptr to update internal cursors if constexpr (NeedSegmentOffsets) { std::vector segment_offsets_array(size); for (int64_t j = 0; j < size; ++j) { segment_offsets_array[j] = static_cast( size_per_chunk_ * i + data_pos + j); } func(nullptr, nullptr, nullptr, segment_offsets_array.data(), size, res + processed_size, valid_res + processed_size, values...); } else { func(nullptr, nullptr, nullptr, size, res + processed_size, valid_res + processed_size, values...); } } processed_size += size; if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; break; } } return processed_size; } template int64_t ProcessDataChunksForMultipleChunk( FUNC func, std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { int64_t processed_size = 0; // if constexpr (std::is_same_v || // std::is_same_v) { // if (segment_->type() == SegmentType::Sealed) { // return ProcessChunkForSealedSeg( // func, skip_func, res, values...); // } // } for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; // if segment is chunked, type won't be growing int64_t size = segment_->chunk_size(field_id_, i) - data_pos; size = std::min(size, batch_size_ - processed_size); if (size == 0) continue; //do not go empty-loop at the bound of the chunk std::vector segment_offsets_array(size); auto start_offset = segment_->num_rows_until_chunk(field_id_, i) + data_pos; for (int64_t j = 0; j < size; ++j) { int64_t offset = start_offset + j; segment_offsets_array[j] = static_cast(offset); } auto& skip_index = segment_->GetSkipIndex(); if (!skip_func || !skip_func(skip_index, field_id_, i)) { bool is_seal = false; if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { if (segment_->type() == SegmentType::Sealed) { // first is the raw data, second is valid_data // use valid_data to see if raw data is null auto [data_vec, valid_data] = segment_->get_batch_views( field_id_, i, data_pos, size); if constexpr (NeedSegmentOffsets) { // For GIS functions: construct segment offsets array func(data_vec.data(), valid_data.data(), nullptr, segment_offsets_array.data(), size, res + processed_size, valid_res + processed_size, values...); } else { // For regular functions: no segment offsets func(data_vec.data(), valid_data.data(), nullptr, size, res + processed_size, valid_res + processed_size, values...); } is_seal = true; } } if (!is_seal) { auto chunk = segment_->chunk_data(field_id_, i); const T* data = chunk.data() + data_pos; const bool* valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_data += data_pos; } if constexpr (NeedSegmentOffsets) { // For GIS functions: construct segment offsets array func(data, valid_data, nullptr, segment_offsets_array.data(), size, res + processed_size, valid_res + processed_size, values...); } else { func(data, valid_data, nullptr, size, res + processed_size, valid_res + processed_size, values...); } } } else { // Chunk is skipped by SkipIndex. // We still need to: // 1. Apply valid_data to handle nullable fields // 2. Call func with nullptr to update internal cursors // (e.g., processed_cursor for bitmap_input indexing) const bool* valid_data; if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { if (segment_->type() == SegmentType::Sealed) { auto batch_views = segment_->get_batch_views( field_id_, i, data_pos, size); valid_data = batch_views.second.data(); ApplyValidData(valid_data, res + processed_size, valid_res + processed_size, size); } } else { auto chunk = segment_->chunk_data(field_id_, i); valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_data += data_pos; } ApplyValidData(valid_data, res + processed_size, valid_res + processed_size, size); } // Call func with nullptr to update internal cursors if constexpr (NeedSegmentOffsets) { func(nullptr, nullptr, nullptr, segment_offsets_array.data(), size, res + processed_size, valid_res + processed_size, values...); } else { func(nullptr, nullptr, nullptr, size, res + processed_size, valid_res + processed_size, values...); } } processed_size += size; if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; break; } } return processed_size; } template int64_t ProcessDataChunks( FUNC func, std::function skip_func, TargetBitmapView res, TargetBitmapView valid_res, ValTypes... values) { if (segment_->is_chunked()) { return ProcessDataChunksForMultipleChunk( func, skip_func, res, valid_res, values...); } else { return ProcessDataChunksForSingleChunk( func, skip_func, res, valid_res, values...); } } int ProcessIndexOneChunk(TargetBitmap& result, TargetBitmap& valid_result, size_t chunk_id, const TargetBitmap& chunk_res, const TargetBitmap& chunk_valid_res, int processed_rows) { auto data_pos = chunk_id == current_index_chunk_ ? current_index_chunk_pos_ : 0; auto size = std::min( std::min(size_per_chunk_ - data_pos, batch_size_ - processed_rows), int64_t(chunk_res.size())); // result.insert(result.end(), // chunk_res.begin() + data_pos, // chunk_res.begin() + data_pos + size); result.append(chunk_res, data_pos, size); valid_result.append(chunk_valid_res, data_pos, size); return size; } template VectorPtr ProcessIndexChunks(FUNC func, const ValTypes&... values) { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; TargetBitmap result; TargetBitmap valid_result; int processed_rows = 0; for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { // This cache result help getting result for every batch loop. // It avoids indexing execute for every batch because indexing // executing costs quite much time. if (cached_index_chunk_id_ != i) { Index* index_ptr = nullptr; if (field_type_ == DataType::JSON) { auto pointer = milvus::Json::pointer(nested_path_); const Index& index = segment_->chunk_scalar_index( field_id_, pointer, i); index_ptr = const_cast(&index); } else { const Index& index = segment_->chunk_scalar_index(field_id_, i); index_ptr = const_cast(&index); } cached_index_chunk_res_ = std::make_shared( std::move(func(index_ptr, values...))); auto valid_result = index_ptr->IsNotNull(); cached_index_chunk_valid_res_ = std::make_shared(std::move(valid_result)); cached_index_chunk_id_ = i; } auto size = ProcessIndexOneChunk(result, valid_result, i, *cached_index_chunk_res_, *cached_index_chunk_valid_res_, processed_rows); if (processed_rows + size >= batch_size_) { current_index_chunk_ = i; current_index_chunk_pos_ = i == current_index_chunk_ ? current_index_chunk_pos_ + size : size; break; } processed_rows += size; } return std::make_shared(std::move(result), std::move(valid_result)); } template TargetBitmap ProcessChunksForValid(bool use_index) { if (use_index) { // when T is ArrayView, the ScalarIndex shall be ScalarIndex // NOT ScalarIndex if (std::is_same_v) { auto element_type = segment_->get_schema()[field_id_].get_element_type(); switch (element_type) { case DataType::BOOL: { return ProcessIndexChunksForValid(); } case DataType::INT8: { return ProcessIndexChunksForValid(); } case DataType::INT16: { return ProcessIndexChunksForValid(); } case DataType::INT32: { return ProcessIndexChunksForValid(); } case DataType::INT64: { return ProcessIndexChunksForValid(); } case DataType::FLOAT: { return ProcessIndexChunksForValid(); } case DataType::DOUBLE: { return ProcessIndexChunksForValid(); } case DataType::STRING: case DataType::VARCHAR: { return ProcessIndexChunksForValid(); } case DataType::GEOMETRY: { return ProcessIndexChunksForValid(); } default: PanicInfo(DataTypeInvalid, "unsupported element type: {}", element_type); } } return ProcessIndexChunksForValid(); } else { return ProcessDataChunksForValid(); } } template TargetBitmap ProcessChunksForValidByOffsets(bool use_index, const OffsetVector& input) { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; auto batch_size = input.size(); TargetBitmap valid_result(batch_size); valid_result.set(); if (use_index) { // when T is ArrayView, the ScalarIndex shall be ScalarIndex // NOT ScalarIndex if (std::is_same_v) { auto element_type = segment_->get_schema()[field_id_].get_element_type(); switch (element_type) { case DataType::BOOL: { return ProcessChunksForValidByOffsets(use_index, input); } case DataType::INT8: { return ProcessChunksForValidByOffsets(use_index, input); } case DataType::INT16: { return ProcessChunksForValidByOffsets( use_index, input); } case DataType::INT32: { return ProcessChunksForValidByOffsets( use_index, input); } case DataType::INT64: { return ProcessChunksForValidByOffsets( use_index, input); } case DataType::FLOAT: { return ProcessChunksForValidByOffsets(use_index, input); } case DataType::DOUBLE: { return ProcessChunksForValidByOffsets(use_index, input); } case DataType::STRING: case DataType::VARCHAR: { return ProcessChunksForValidByOffsets( use_index, input); } default: PanicInfo(DataTypeInvalid, "unsupported element type: {}", element_type); } } const Index& index = segment_->chunk_scalar_index(field_id_, 0); auto* index_ptr = const_cast(&index); const auto& res = index_ptr->IsNotNull(); for (auto i = 0; i < batch_size; ++i) { valid_result[i] = res[input[i]]; } } else { for (auto i = 0; i < batch_size; ++i) { auto offset = input[i]; auto [chunk_id, chunk_offset] = [&]() -> std::pair { if (segment_->type() == SegmentType::Growing) { return {offset / size_per_chunk_, offset % size_per_chunk_}; } else if (segment_->is_chunked()) { return segment_->get_chunk_by_offset(field_id_, offset); } else { return {0, offset}; } }(); auto chunk = segment_->chunk_data(field_id_, chunk_id); const bool* valid_data = chunk.valid_data(); if (valid_data != nullptr) { valid_result[i] = valid_data[chunk_offset]; } else { break; } } } return valid_result; } template TargetBitmap ProcessDataChunksForValid() { TargetBitmap valid_result(GetNextBatchSize()); valid_result.set(); int64_t processed_size = 0; for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; int64_t size = 0; if (segment_->is_chunked()) { size = segment_->chunk_size(field_id_, i) - data_pos; } else { size = (i == (num_data_chunk_ - 1)) ? (segment_->type() == SegmentType::Growing ? (active_count_ % size_per_chunk_ == 0 ? size_per_chunk_ - data_pos : active_count_ % size_per_chunk_ - data_pos) : active_count_ - data_pos) : size_per_chunk_ - data_pos; } size = std::min(size, batch_size_ - processed_size); if (size == 0) continue; //do not go empty-loop at the bound of the chunk bool access_sealed_variable_column = false; if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { if (segment_->type() == SegmentType::Sealed) { auto [data_vec, valid_data] = segment_->get_batch_views( field_id_, i, data_pos, size); ApplyValidData(valid_data.data(), valid_result + processed_size, valid_result + processed_size, size); access_sealed_variable_column = true; } } if (!access_sealed_variable_column) { auto chunk = segment_->chunk_data(field_id_, i); const bool* valid_data = chunk.valid_data(); if (valid_data == nullptr) { return valid_result; } valid_data += data_pos; ApplyValidData(valid_data, valid_result + processed_size, valid_result + processed_size, size); } processed_size += size; if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; break; } } return valid_result; } int ProcessIndexOneChunkForValid(TargetBitmap& valid_result, size_t chunk_id, const TargetBitmap& chunk_valid_res, int processed_rows) { auto data_pos = chunk_id == current_index_chunk_ ? current_index_chunk_pos_ : 0; auto size = std::min( std::min(size_per_chunk_ - data_pos, batch_size_ - processed_rows), int64_t(chunk_valid_res.size())); if (field_type_ == DataType::GEOMETRY && segment_->type() == Growing) { size = std::min(batch_size_ - processed_rows, int64_t(chunk_valid_res.size()) - data_pos); } valid_result.append(chunk_valid_res, data_pos, size); return size; } template TargetBitmap ProcessIndexChunksForValid() { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; int processed_rows = 0; TargetBitmap valid_result; valid_result.set(); for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { // This cache result help getting result for every batch loop. // It avoids indexing execute for every batch because indexing // executing costs quite much time. if (cached_index_chunk_id_ != i) { const Index& index = segment_->chunk_scalar_index(field_id_, i); auto* index_ptr = const_cast(&index); auto execute_sub_batch = [](Index* index_ptr) { TargetBitmap res = index_ptr->IsNotNull(); return res; }; cached_index_chunk_valid_res_ = std::make_shared( std::move(execute_sub_batch(index_ptr))); cached_index_chunk_id_ = i; } auto size = ProcessIndexOneChunkForValid(valid_result, i, *cached_index_chunk_valid_res_, processed_rows); if (processed_rows + size >= batch_size_) { current_index_chunk_ = i; current_index_chunk_pos_ = i == current_index_chunk_ ? current_index_chunk_pos_ + size : size; break; } processed_rows += size; } return valid_result; } template void ProcessIndexChunksV2(FUNC func, ValTypes... values) { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { const Index& index = segment_->chunk_scalar_index(field_id_, i); auto* index_ptr = const_cast(&index); func(index_ptr, values...); } } template bool CanUseIndex(OpType op) const { typedef std:: conditional_t, std::string, T> IndexInnerType; if constexpr (!std::is_same_v) { return true; } using Index = index::ScalarIndex; if (op == OpType::Match || op == OpType::InnerMatch || op == OpType::PostfixMatch) { const Index& index = segment_->chunk_scalar_index( field_id_, current_index_chunk_); // 1, index support regex query and try use it, then index handles the query; // 2, index has raw data, then call index.Reverse_Lookup to handle the query; return (index.TryUseRegexQuery() && index.SupportRegexQuery()) || index.HasRawData(); } return true; } template bool IndexHasRawData() const { typedef std:: conditional_t, std::string, T> IndexInnerType; using Index = index::ScalarIndex; for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) { const Index& index = segment_->chunk_scalar_index(field_id_, i); if (!index.HasRawData()) { return false; } } return true; } void SetNotUseIndex() { use_index_ = false; } bool CanUseJsonKeyIndex(FieldId field_id) const { if (segment_->type() == SegmentType::Sealed) { auto sealed_seg = dynamic_cast(segment_); Assert(sealed_seg != nullptr); if (sealed_seg->GetJsonKeyIndex(field_id) != nullptr) { return true; } } else if (segment_->type() == SegmentType ::Growing) { if (segment_->GetJsonKeyIndex(field_id) != nullptr) { return true; } } return false; } protected: const segcore::SegmentInternalInterface* segment_; const FieldId field_id_; bool is_pk_field_{false}; DataType pk_type_; int64_t batch_size_; std::vector nested_path_; DataType field_type_; DataType value_type_; bool allow_any_json_cast_type_{false}; bool is_json_contains_{false}; bool is_index_mode_{false}; bool is_data_mode_{false}; // sometimes need to skip index and using raw data // default true means use index as much as possible bool use_index_{true}; int64_t active_count_{0}; int64_t num_data_chunk_{0}; int64_t num_index_chunk_{0}; // State indicate position that expr computing at // because expr maybe called for every batch. int64_t current_data_chunk_{0}; int64_t current_data_chunk_pos_{0}; int64_t current_data_global_pos_{0}; int64_t current_index_chunk_{0}; int64_t current_index_chunk_pos_{0}; int64_t size_per_chunk_{0}; // Cache for index scan to avoid search index every batch int64_t cached_index_chunk_id_{-1}; std::shared_ptr cached_index_chunk_res_{nullptr}; // Cache for chunk valid res. std::shared_ptr cached_index_chunk_valid_res_{nullptr}; // Cache for text match. std::shared_ptr cached_match_res_{nullptr}; int32_t consistency_level_{0}; }; bool IsLikeExpr(std::shared_ptr expr); void OptimizeCompiledExprs(ExecContext* context, const std::vector& exprs); std::vector CompileExpressions(const std::vector& logical_exprs, ExecContext* context, const std::unordered_set& flatten_cadidates = std::unordered_set(), bool enable_constant_folding = false); std::vector CompileInputs(const expr::TypedExprPtr& expr, QueryContext* config, const std::unordered_set& flatten_cadidates); ExprPtr CompileExpression(const expr::TypedExprPtr& expr, QueryContext* context, const std::unordered_set& flatten_cadidates, bool enable_constant_folding); class ExprSet { public: explicit ExprSet(const std::vector& logical_exprs, ExecContext* exec_ctx) : exec_ctx_(exec_ctx) { exprs_ = CompileExpressions(logical_exprs, exec_ctx); } virtual ~ExprSet() = default; void Eval(EvalCtx& ctx, std::vector& results) { Eval(0, exprs_.size(), true, ctx, results); } virtual void Eval(int32_t begin, int32_t end, bool initialize, EvalCtx& ctx, std::vector& result); void Clear() { exprs_.clear(); } ExecContext* get_exec_context() const { return exec_ctx_; } size_t size() const { return exprs_.size(); } const std::vector>& exprs() const { return exprs_; } const std::shared_ptr& expr(int32_t index) const { return exprs_[index]; } private: std::vector> exprs_; ExecContext* exec_ctx_; }; } //namespace exec } // namespace milvus