From b6199acb050d38b720d82bc70f2b69bd9860c3b5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 7 Aug 2025 14:19:40 +0800 Subject: [PATCH] enhance: Utilize `search_batch_pks` for `search_ids` of PkTerm (#43751) Related to #43660 --------- Signed-off-by: Congqi Xia --- .../src/exec/expression/JsonContainsExpr.cpp | 8 ++- .../core/src/exec/expression/TermExpr.cpp | 6 +- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 56 +++++++------------ .../src/segcore/ChunkedSegmentSealedImpl.h | 4 +- internal/core/src/segcore/DeletedRecord.h | 4 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 21 +------ .../core/src/segcore/SegmentGrowingImpl.h | 2 +- internal/core/src/segcore/SegmentInterface.h | 9 ++- internal/core/unittest/test_delete_record.cpp | 15 +++-- internal/core/unittest/test_utils.cpp | 3 +- 10 files changed, 56 insertions(+), 72 deletions(-) diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index a0ebcf6fc9..7ee0d2ad29 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -34,9 +34,13 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { return; } if (expr_->op_ == proto::plan::JSONContainsExpr_JSONOp_ContainsAll) { - result = std::make_shared(TargetBitmap(real_batch_size, true), TargetBitmap(real_batch_size, true)); + result = std::make_shared( + TargetBitmap(real_batch_size, true), + TargetBitmap(real_batch_size, true)); } else { - result = std::make_shared(TargetBitmap(real_batch_size, false), TargetBitmap(real_batch_size, true)); + result = std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); } MoveCursor(); return; diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index e473dbd9b6..f55e915dfe 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -191,8 +191,7 @@ PhyTermFilterExpr::InitPkCacheOffset() { } } - auto [uids, seg_offsets] = - segment_->search_ids(*id_array, query_timestamp_); + auto seg_offsets = segment_->search_ids(*id_array, query_timestamp_); cached_bits_.resize(active_count_, false); for (const auto& offset : seg_offsets) { auto _offset = (int64_t)offset.get(); @@ -540,8 +539,7 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() { if (!arg_inited_) { arg_set_ = std::make_shared>(expr_->vals_); if constexpr (std::is_same_v) { - arg_set_float_ = - std::make_shared>(expr_->vals_); + arg_set_float_ = std::make_shared>(expr_->vals_); } arg_inited_ = true; } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index b0d2935fa3..4d9b98c76c 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -916,17 +916,18 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk, void ChunkedSegmentSealedImpl::search_batch_pks( const std::vector& pks, - const Timestamp* timestamps, + const std::function& get_timestamp, bool include_same_ts, const std::function& callback) const { // handle unsorted case if (!is_sorted_by_pk_) { for (size_t i = 0; i < pks.size(); i++) { - auto offsets = insert_record_.search_pk( - pks[i], timestamps[i], include_same_ts); + auto timestamp = get_timestamp(i); + auto offsets = + insert_record_.search_pk(pks[i], timestamp, include_same_ts); for (auto offset : offsets) { - callback(offset, timestamps[i]); + callback(offset, timestamp); } } return; @@ -956,7 +957,7 @@ ChunkedSegmentSealedImpl::search_batch_pks( for (size_t j = 0; j < pks.size(); j++) { // get int64 pks auto target = std::get(pks[j]); - auto timestamp = timestamps[j]; + auto timestamp = get_timestamp(j); auto it = std::lower_bound( src, src + chunk_row_num, @@ -988,7 +989,7 @@ ChunkedSegmentSealedImpl::search_batch_pks( for (size_t j = 0; j < pks.size(); ++j) { // get varchar pks auto& target = std::get(pks[j]); - auto timestamp = timestamps[j]; + auto timestamp = get_timestamp(j); auto offset = string_chunk->binary_search_string(target); for (; offset != -1 && offset < string_chunk->RowNums() && string_chunk->operator[](offset) == target; @@ -1142,7 +1143,11 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( const Timestamp* timestamps, std::function callback) { - this->search_batch_pks(pks, timestamps, false, callback); + this->search_batch_pks( + pks, + [&](const size_t idx) { return timestamps[idx]; }, + false, + callback); }, segment_id) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); @@ -1752,7 +1757,7 @@ ChunkedSegmentSealedImpl::GetFieldDataType(milvus::FieldId field_id) const { return field_meta.get_data_type(); } -std::pair, std::vector> +std::vector ChunkedSegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) const { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); @@ -1763,37 +1768,16 @@ ChunkedSegmentSealedImpl::search_ids(const IdArray& id_array, std::vector pks(ids_size); ParsePksFromIDs(pks, data_type, id_array); - auto res_id_arr = std::make_unique(); std::vector res_offsets; res_offsets.reserve(pks.size()); - for (auto& pk : pks) { - std::vector pk_offsets; - if (!is_sorted_by_pk_) { - pk_offsets = insert_record_.search_pk(pk, timestamp); - } else { - pk_offsets = search_pk(pk, timestamp); - } - for (auto offset : pk_offsets) { - switch (data_type) { - case DataType::INT64: { - res_id_arr->mutable_int_id()->add_data( - std::get(pk)); - break; - } - case DataType::VARCHAR: { - res_id_arr->mutable_str_id()->add_data( - std::get(std::move(pk))); - break; - } - default: { - ThrowInfo(DataTypeInvalid, - fmt::format("unsupported type {}", data_type)); - } - } + this->search_batch_pks( + pks, + [=](const size_t idx) { return timestamp; }, + true, + [&](const SegOffset offset, const Timestamp ts) { res_offsets.push_back(offset); - } - } - return {std::move(res_id_arr), std::move(res_offsets)}; + }); + return std::move(res_offsets); } SegcoreError diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 0a75967c87..b4a63a2bcc 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -207,7 +207,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { void search_batch_pks( const std::vector& pks, - const Timestamp* timestamps, + const std::function& get_timestamp, bool include_same_ts, const std::function& callback) const; @@ -410,7 +410,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 1; } - std::pair, std::vector> + std::vector search_ids(const IdArray& id_array, Timestamp timestamp) const override; void diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 32f0890c77..260ff62858 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -118,7 +118,9 @@ class DeletedRecord { } } search_pk_func_( - pks, timestamps, [&](SegOffset offset, Timestamp delete_ts) { + pks, + timestamps, + [&](const SegOffset offset, const Timestamp delete_ts) { auto row_id = offset.get(); // if already deleted, no need to add new record if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 977c0cb435..308c73e777 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -1106,7 +1106,7 @@ SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type, } } -std::pair, std::vector> +std::vector SegmentGrowingImpl::search_ids(const IdArray& id_array, Timestamp timestamp) const { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); @@ -1117,32 +1117,15 @@ SegmentGrowingImpl::search_ids(const IdArray& id_array, std::vector pks(ids_size); ParsePksFromIDs(pks, data_type, id_array); - auto res_id_arr = std::make_unique(); std::vector res_offsets; res_offsets.reserve(pks.size()); for (auto& pk : pks) { auto segOffsets = insert_record_.search_pk(pk, timestamp); for (auto offset : segOffsets) { - switch (data_type) { - case DataType::INT64: { - res_id_arr->mutable_int_id()->add_data( - std::get(pk)); - break; - } - case DataType::VARCHAR: { - res_id_arr->mutable_str_id()->add_data( - std::get(std::move(pk))); - break; - } - default: { - ThrowInfo(DataTypeInvalid, - fmt::format("unsupported type {}", data_type)); - } - } res_offsets.push_back(offset); } } - return {std::move(res_id_arr), std::move(res_offsets)}; + return std::move(res_offsets); } std::string diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 00b7c9bd55..db7de56c38 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -340,7 +340,7 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t ins_barrier, Timestamp timestamp) const override; - std::pair, std::vector> + std::vector search_ids(const IdArray& id_array, Timestamp timestamp) const override; bool diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 0783aaefc8..6e628eaf56 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -440,7 +440,14 @@ class SegmentInternalInterface : public SegmentInterface { virtual int64_t get_active_count(Timestamp ts) const = 0; - virtual std::pair, std::vector> + /** + * search offset by possible pk values and mvcc timestamp + * + * @param id_array possible pk values + * @param timestamp mvcc timestamp + * @return all the hit entries in vector of offsets + */ + virtual std::vector search_ids(const IdArray& id_array, Timestamp timestamp) const = 0; /** diff --git a/internal/core/unittest/test_delete_record.cpp b/internal/core/unittest/test_delete_record.cpp index e297b426da..71c20466d0 100644 --- a/internal/core/unittest/test_delete_record.cpp +++ b/internal/core/unittest/test_delete_record.cpp @@ -45,7 +45,8 @@ TEST(DeleteMVCC, common_case) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); @@ -170,7 +171,8 @@ TEST(DeleteMVCC, delete_exist_duplicate_pks) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); @@ -294,7 +296,8 @@ TEST(DeleteMVCC, snapshot) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); @@ -351,7 +354,8 @@ TEST(DeleteMVCC, insert_after_snapshot) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); @@ -455,7 +459,8 @@ TEST(DeleteMVCC, perform) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index a00852674c..bed7e7291e 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -93,7 +93,8 @@ TEST(Util, GetDeleteBitmap) { [&insert_record]( const std::vector& pks, const Timestamp* timestamps, - std::function cb) { + std::function + cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp);