diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 0ddf699685..8cef0b9df2 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -196,12 +196,8 @@ PhyTermFilterExpr::InitPkCacheOffset() { } } - auto seg_offsets = segment_->search_ids(*id_array, query_timestamp_); cached_bits_.resize(active_count_, false); - for (const auto& offset : seg_offsets) { - auto _offset = (int64_t)offset.get(); - cached_bits_[_offset] = true; - } + segment_->search_ids(cached_bits_, *id_array); cached_bits_inited_ = true; } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 05206524dd..9d2a35ef9a 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -1038,6 +1038,84 @@ ChunkedSegmentSealedImpl::search_pk(milvus::OpContext* op_ctx, }); } +void +ChunkedSegmentSealedImpl::search_pks(BitsetType& bitset, + const std::vector& pks) const { + BitsetTypeView bitset_view(bitset); + if (!is_sorted_by_pk_) { + for (auto& pk : pks) { + insert_record_.search_pk_range( + pk, proto::plan::OpType::Equal, bitset_view); + } + return; + } + + auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); + AssertInfo(pk_field_id.get() != -1, "Primary key is -1"); + auto pk_column = get_column(pk_field_id); + AssertInfo(pk_column != nullptr, "primary key column not loaded"); + + auto all_chunk_pins = pk_column->GetAllChunks(nullptr); + switch (schema_->get_fields().at(pk_field_id).get_data_type()) { + case DataType::INT64: { + auto num_chunk = pk_column->num_chunks(); + for (int i = 0; i < num_chunk; ++i) { + auto pw = all_chunk_pins[i]; + auto src = + reinterpret_cast(pw.get()->RawData()); + auto chunk_row_num = pk_column->chunk_row_nums(i); + for (size_t j = 0; j < pks.size(); j++) { + // get int64 pks + auto target = std::get(pks[j]); + auto it = std::lower_bound( + src, + src + chunk_row_num, + target, + [](const int64_t& elem, const int64_t& value) { + return elem < value; + }); + auto num_rows_until_chunk = + pk_column->GetNumRowsUntilChunk(i); + for (; it != src + chunk_row_num && *it == target; ++it) { + auto offset = it - src + num_rows_until_chunk; + bitset[offset] = true; + } + } + } + + break; + } + case DataType::VARCHAR: { + auto num_chunk = pk_column->num_chunks(); + for (int i = 0; i < num_chunk; ++i) { + // TODO @xiaocai2333, @sunby: chunk need to record the min/max. + auto num_rows_until_chunk = pk_column->GetNumRowsUntilChunk(i); + auto pw = all_chunk_pins[i]; + auto string_chunk = static_cast(pw.get()); + for (size_t j = 0; j < pks.size(); ++j) { + // get varchar pks + auto& target = std::get(pks[j]); + auto offset = string_chunk->binary_search_string(target); + for (; offset != -1 && offset < string_chunk->RowNums() && + string_chunk->operator[](offset) == target; + ++offset) { + auto segment_offset = offset + num_rows_until_chunk; + bitset[segment_offset] = true; + } + } + } + break; + } + default: { + ThrowInfo( + DataTypeInvalid, + fmt::format( + "unsupported type {}", + schema_->get_fields().at(pk_field_id).get_data_type())); + } + } +} + void ChunkedSegmentSealedImpl::search_batch_pks( const std::vector& pks, @@ -2221,9 +2299,9 @@ ChunkedSegmentSealedImpl::GetFieldDataType(milvus::FieldId field_id) const { return field_meta.get_data_type(); } -std::vector -ChunkedSegmentSealedImpl::search_ids(const IdArray& id_array, - Timestamp timestamp) const { +void +ChunkedSegmentSealedImpl::search_ids(BitsetType& bitset, + const IdArray& id_array) const { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(field_id.get() != -1, "Primary key is -1"); auto& field_meta = schema_->operator[](field_id); @@ -2232,16 +2310,7 @@ ChunkedSegmentSealedImpl::search_ids(const IdArray& id_array, std::vector pks(ids_size); ParsePksFromIDs(pks, data_type, id_array); - std::vector res_offsets; - res_offsets.reserve(pks.size()); - this->search_batch_pks( - pks, - [=](const size_t idx) { return timestamp; }, - true, - [&](const SegOffset offset, const Timestamp ts) { - res_offsets.push_back(offset); - }); - return std::move(res_offsets); + this->search_pks(bitset, pks); } SegcoreError diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 8b07dea4ff..4f1a05c1f9 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -248,6 +248,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return true; } + void + search_pks(BitsetType& bitset, const std::vector& pks) const; + void search_batch_pks( const std::vector& pks, @@ -478,8 +481,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 1; } - std::vector - search_ids(const IdArray& id_array, Timestamp timestamp) const override; + void + search_ids(BitsetType& bitset, const IdArray& id_array) const override; void LoadVecIndex(const LoadIndexInfo& info); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 6fabd7582b..f3a40aae15 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -1164,9 +1164,9 @@ SegmentGrowingImpl::bulk_subscript(milvus::OpContext* op_ctx, } } -std::vector -SegmentGrowingImpl::search_ids(const IdArray& id_array, - Timestamp timestamp) const { +void +SegmentGrowingImpl::search_ids(BitsetType& bitset, + const IdArray& id_array) const { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(field_id.get() != -1, "Primary key is -1"); auto& field_meta = schema_->operator[](field_id); @@ -1175,15 +1175,11 @@ SegmentGrowingImpl::search_ids(const IdArray& id_array, std::vector pks(ids_size); ParsePksFromIDs(pks, data_type, id_array); - std::vector res_offsets; - res_offsets.reserve(pks.size()); + BitsetTypeView bitset_view(bitset); for (auto& pk : pks) { - auto segOffsets = insert_record_.search_pk(pk, timestamp); - for (auto offset : segOffsets) { - res_offsets.push_back(offset); - } + insert_record_.search_pk_range( + pk, proto::plan::OpType::Equal, bitset_view); } - return std::move(res_offsets); } std::string diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index d3a018c671..b7ee3dbd37 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -375,8 +375,8 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t ins_barrier, Timestamp timestamp) const override; - std::vector - search_ids(const IdArray& id_array, Timestamp timestamp) const override; + void + search_ids(BitsetType& bitset, const IdArray& id_array) const override; bool HasIndex(FieldId field_id) const { diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index dacbfebe7f..7121090060 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -434,12 +434,14 @@ class SegmentInternalInterface : public SegmentInterface { /** * search offset by possible pk values and mvcc timestamp * + * @param bitset The final bitset after id array filtering, + * `false` means that the entity will be filtered out. * @param id_array possible pk values - * @param timestamp mvcc timestamp - * @return all the hit entries in vector of offsets + * this interface is used for internal expression calculation, + * so no need timestamp parameter, mvcc node prove the timestamp is already filtered. */ - virtual std::vector - search_ids(const IdArray& id_array, Timestamp timestamp) const = 0; + virtual void + search_ids(BitsetType& bitset, const IdArray& id_array) const = 0; /** * Apply timestamp filtering on bitset, the query can't see an entity whose