From 494ac0727161c05a7ac8e16bde27d72c841cde7d Mon Sep 17 00:00:00 2001 From: sre-ci-robot <56469371+sre-ci-robot@users.noreply.github.com> Date: Fri, 2 Aug 2024 19:33:20 +0800 Subject: [PATCH] Revert "enhance: remove timestamp_filter after retrieve (#35207)" This reverts commit 16dd53e7cf7f925167045403e38213a9c95ca544. --- .../core/src/exec/expression/TermExpr.cpp | 11 +++-- .../src/query/generated/ExecPlanNodeVisitor.h | 36 ++++++++++++++- .../src/query/visitors/ExecExprVisitor.cpp | 1 + .../query/visitors/ExecPlanNodeVisitor.cpp | 44 +++++++++++++------ internal/core/src/segcore/InsertRecord.h | 38 +++++++++++----- .../core/src/segcore/SegmentGrowingImpl.h | 7 ++- internal/core/src/segcore/SegmentInterface.h | 4 +- internal/core/src/segcore/SegmentSealedImpl.h | 7 ++- .../unittest/test_offset_ordered_array.cpp | 19 ++++---- .../core/unittest/test_offset_ordered_map.cpp | 25 ++++++----- 10 files changed, 140 insertions(+), 52 deletions(-) diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 4bf3ab7e42..54895a0f2c 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -139,6 +139,7 @@ PhyTermFilterExpr::CanSkipSegment() { if (segment_->type() == SegmentType::Sealed && skip_index.CanSkipBinaryRange(field_id_, 0, min, max, true, true)) { cached_bits_.resize(active_count_, false); + cached_offsets_ = std::make_shared(DataType::INT64, 0); cached_offsets_inited_ = true; return true; } @@ -177,9 +178,14 @@ PhyTermFilterExpr::InitPkCacheOffset() { auto [uids, seg_offsets] = segment_->search_ids(*id_array, query_timestamp_); cached_bits_.resize(active_count_, false); + cached_offsets_ = + std::make_shared(DataType::INT64, seg_offsets.size()); + int64_t* cached_offsets_ptr = (int64_t*)cached_offsets_->GetRawData(); + int i = 0; for (const auto& offset : seg_offsets) { auto _offset = (int64_t)offset.get(); cached_bits_[_offset] = true; + cached_offsets_ptr[i++] = _offset; } cached_offsets_inited_ = true; } @@ -208,10 +214,7 @@ PhyTermFilterExpr::ExecPkTermImpl() { } if (use_cache_offsets_) { - auto cache_bits_copy = cached_bits_.clone(); - std::vector vecs{ - res_vec, - std::make_shared(std::move(cache_bits_copy))}; + std::vector vecs{res_vec, cached_offsets_}; return std::make_shared(vecs); } else { return res_vec; diff --git a/internal/core/src/query/generated/ExecPlanNodeVisitor.h b/internal/core/src/query/generated/ExecPlanNodeVisitor.h index 96b5d9b2f9..d3b69a388d 100644 --- a/internal/core/src/query/generated/ExecPlanNodeVisitor.h +++ b/internal/core/src/query/generated/ExecPlanNodeVisitor.h @@ -78,6 +78,21 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { return ret; } + void + SetExprCacheOffsets(std::vector&& offsets) { + expr_cached_pk_id_offsets_ = std::move(offsets); + } + + void + AddExprCacheOffset(int64_t offset) { + expr_cached_pk_id_offsets_.push_back(offset); + } + + const std::vector& + GetExprCacheOffsets() { + return expr_cached_pk_id_offsets_; + } + void SetExprUsePkIndex(bool use_pk_index) { expr_use_pk_index_ = use_pk_index; @@ -88,11 +103,29 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { return expr_use_pk_index_; } + void + ExecuteExprNodeInternal( + const std::shared_ptr& plannode, + const milvus::segcore::SegmentInternalInterface* segment, + int64_t active_count, + BitsetType& result, + bool& cache_offset_getted, + std::vector& cache_offset); + void ExecuteExprNode(const std::shared_ptr& plannode, const milvus::segcore::SegmentInternalInterface* segment, int64_t active_count, - BitsetType& result); + BitsetType& result) { + bool get_cache_offset; + std::vector cache_offsets; + ExecuteExprNodeInternal(plannode, + segment, + active_count, + result, + get_cache_offset, + cache_offsets); + } private: template @@ -107,5 +140,6 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor { SearchResultOpt search_result_opt_; RetrieveResultOpt retrieve_result_opt_; bool expr_use_pk_index_ = false; + std::vector expr_cached_pk_id_offsets_; }; } // namespace milvus::query diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 7652e0e436..d0b59873ee 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -2547,6 +2547,7 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> BitsetType { // If enable plan_visitor pk index cache, pass offsets_ to it if (plan_visitor_ != nullptr) { plan_visitor_->SetExprUsePkIndex(true); + plan_visitor_->SetExprCacheOffsets(std::move(cached_offsets)); } AssertInfo(bitset.size() == row_count_, "[ExecExprVisitor]Size of results not equal row count"); diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index f3c01beb30..c5902fd63e 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -75,11 +75,13 @@ empty_search_result(int64_t num_queries, SearchInfo& search_info) { } void -ExecPlanNodeVisitor::ExecuteExprNode( +ExecPlanNodeVisitor::ExecuteExprNodeInternal( const std::shared_ptr& plannode, const milvus::segcore::SegmentInternalInterface* segment, int64_t active_count, - BitsetType& bitset_holder) { + BitsetType& bitset_holder, + bool& cache_offset_getted, + std::vector& cache_offset) { bitset_holder.clear(); LOG_DEBUG("plannode: {}, active_count: {}, timestamp: {}", plannode->ToString(), @@ -92,7 +94,6 @@ ExecPlanNodeVisitor::ExecuteExprNode( auto task = milvus::exec::Task::Create(DEFAULT_TASK_ID, plan, 0, query_context); - bool cache_offset_getted = false; for (;;) { auto result = task->Next(); if (!result) { @@ -114,17 +115,20 @@ ExecPlanNodeVisitor::ExecuteExprNode( if (!cache_offset_getted) { // offset cache only get once because not support iterator batch - auto cache_bits_vec = + auto cache_offset_vec = std::dynamic_pointer_cast(row->child(1)); - TargetBitmapView view(cache_bits_vec->GetRawData(), - cache_bits_vec->size()); - // If get empty cached bits. mean no record hits in this segment + // If get empty cached offsets. mean no record hits in this segment // no need to get next batch. - if (view.count() == 0) { + if (cache_offset_vec->size() == 0) { bitset_holder.resize(active_count); task->RequestCancel(); break; } + auto cache_offset_vec_ptr = + (int64_t*)(cache_offset_vec->GetRawData()); + for (size_t i = 0; i < cache_offset_vec->size(); ++i) { + cache_offset.push_back(cache_offset_vec_ptr[i]); + } cache_offset_getted = true; } } else { @@ -277,12 +281,17 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) { bitset_holder.resize(active_count); } + // This flag used to indicate whether to get offset from expr module that + // speeds up mvcc filter in the next interface: "timestamp_filter" + bool get_cache_offset = false; std::vector cache_offsets; if (node.filter_plannode_.has_value()) { - ExecuteExprNode(node.filter_plannode_.value(), - segment, - active_count, - bitset_holder); + ExecuteExprNodeInternal(node.filter_plannode_.value(), + segment, + active_count, + bitset_holder, + get_cache_offset, + cache_offsets); bitset_holder.flip(); } @@ -304,7 +313,16 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) { } retrieve_result.total_data_cnt_ = bitset_holder.size(); - auto results_pair = segment->find_first(node.limit_, bitset_holder); + bool false_filtered_out = false; + if (get_cache_offset) { + segment->timestamp_filter(bitset_holder, cache_offsets, timestamp_); + } else { + bitset_holder.flip(); + false_filtered_out = true; + segment->timestamp_filter(bitset_holder, timestamp_); + } + auto results_pair = + segment->find_first(node.limit_, bitset_holder, false_filtered_out); retrieve_result.result_offsets_ = std::move(results_pair.first); retrieve_result.has_more_result = results_pair.second; retrieve_result_opt_ = std::move(retrieve_result); diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index c010971e76..d590e3dfb7 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -63,7 +63,9 @@ class OffsetMap { using OffsetType = int64_t; // TODO: in fact, we can retrieve the pk here. Not sure which way is more efficient. virtual std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const = 0; + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const = 0; virtual void clear() = 0; @@ -167,7 +169,9 @@ class OffsetOrderedMap : public OffsetMap { } std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const override { + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const override { std::shared_lock lck(mtx_); if (limit == Unlimited || limit == NoLimit) { @@ -176,7 +180,7 @@ class OffsetOrderedMap : public OffsetMap { // TODO: we can't retrieve pk by offset very conveniently. // Selectivity should be done outside. - return find_first_by_index(limit, bitset); + return find_first_by_index(limit, bitset, false_filtered_out); } void @@ -187,10 +191,15 @@ class OffsetOrderedMap : public OffsetMap { private: std::pair, bool> - find_first_by_index(int64_t limit, const BitsetType& bitset) const { + find_first_by_index(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const { int64_t hit_num = 0; // avoid counting the number everytime. + int64_t cnt = bitset.count(); auto size = bitset.size(); - int64_t cnt = size - bitset.count(); + if (!false_filtered_out) { + cnt = size - bitset.count(); + } limit = std::min(limit, cnt); std::vector seg_offsets; seg_offsets.reserve(limit); @@ -205,7 +214,7 @@ class OffsetOrderedMap : public OffsetMap { continue; } - if (!bitset[seg_offset]) { + if (!(bitset[seg_offset] ^ false_filtered_out)) { seg_offsets.push_back(seg_offset); hit_num++; // PK hit, no need to continue traversing offsets with the same PK. @@ -337,7 +346,9 @@ class OffsetOrderedArray : public OffsetMap { } std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const override { + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const override { check_search(); if (limit == Unlimited || limit == NoLimit) { @@ -346,7 +357,7 @@ class OffsetOrderedArray : public OffsetMap { // TODO: we can't retrieve pk by offset very conveniently. // Selectivity should be done outside. - return find_first_by_index(limit, bitset); + return find_first_by_index(limit, bitset, false_filtered_out); } void @@ -357,10 +368,15 @@ class OffsetOrderedArray : public OffsetMap { private: std::pair, bool> - find_first_by_index(int64_t limit, const BitsetType& bitset) const { + find_first_by_index(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const { int64_t hit_num = 0; // avoid counting the number everytime. + int64_t cnt = bitset.count(); auto size = bitset.size(); - int64_t cnt = size - bitset.count(); + if (!false_filtered_out) { + cnt = size - bitset.count(); + } auto more_hit_than_limit = cnt > limit; limit = std::min(limit, cnt); std::vector seg_offsets; @@ -373,7 +389,7 @@ class OffsetOrderedArray : public OffsetMap { continue; } - if (!bitset[seg_offset]) { + if (!(bitset[seg_offset] ^ false_filtered_out)) { seg_offsets.push_back(seg_offset); hit_num++; } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 734ef83bc8..8715ca488f 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -307,8 +307,11 @@ class SegmentGrowingImpl : public SegmentGrowing { } std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const override { - return insert_record_.pk2offset_->find_first(limit, bitset); + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const override { + return insert_record_.pk2offset_->find_first( + limit, bitset, false_filtered_out); } bool diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 845dbade5c..7d2bbab6dd 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -340,7 +340,9 @@ class SegmentInternalInterface : public SegmentInterface { * @return All candidates offsets. */ virtual std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const = 0; + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const = 0; void FillTargetEntry( diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index d739d69d95..cf4340cc24 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -151,8 +151,11 @@ class SegmentSealedImpl : public SegmentSealed { const Timestamp* timestamps) override; std::pair, bool> - find_first(int64_t limit, const BitsetType& bitset) const override { - return insert_record_.pk2offset_->find_first(limit, bitset); + find_first(int64_t limit, + const BitsetType& bitset, + bool false_filtered_out) const override { + return insert_record_.pk2offset_->find_first( + limit, bitset, false_filtered_out); } // Calculate: output[i] = Vec[seg_offset[i]] diff --git a/internal/core/unittest/test_offset_ordered_array.cpp b/internal/core/unittest/test_offset_ordered_array.cpp index 356cf64073..fd817fbdd5 100644 --- a/internal/core/unittest/test_offset_ordered_array.cpp +++ b/internal/core/unittest/test_offset_ordered_array.cpp @@ -66,7 +66,7 @@ TYPED_TEST_SUITE_P(TypedOffsetOrderedArrayTest); TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { // not sealed. - ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {})); + ASSERT_ANY_THROW(this->map_.find_first(Unlimited, {}, true)); // insert 10 entities. int num = 10; @@ -81,8 +81,10 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { // all is satisfied. { BitsetType all(num); + all.set(); { - auto [offsets, has_more_res] = this->map_.find_first(num / 2, all); + auto [offsets, has_more_res] = + this->map_.find_first(num / 2, all, true); ASSERT_EQ(num / 2, offsets.size()); ASSERT_TRUE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -91,7 +93,7 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { } { auto [offsets, has_more_res] = - this->map_.find_first(Unlimited, all); + this->map_.find_first(Unlimited, all, true); ASSERT_EQ(num, offsets.size()); ASSERT_FALSE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -102,9 +104,10 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { { // corner case, segment offset exceeds the size of bitset. BitsetType all_minus_1(num - 1); + all_minus_1.set(); { auto [offsets, has_more_res] = - this->map_.find_first(num / 2, all_minus_1); + this->map_.find_first(num / 2, all_minus_1, true); ASSERT_EQ(num / 2, offsets.size()); ASSERT_TRUE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -113,7 +116,7 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { } { auto [offsets, has_more_res] = - this->map_.find_first(Unlimited, all_minus_1); + this->map_.find_first(Unlimited, all_minus_1, true); ASSERT_EQ(all_minus_1.size(), offsets.size()); ASSERT_FALSE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -124,11 +127,11 @@ TYPED_TEST_P(TypedOffsetOrderedArrayTest, find_first) { { // none is satisfied. BitsetType none(num); - none.set(); - auto result_pair = this->map_.find_first(num / 2, none); + none.reset(); + auto result_pair = this->map_.find_first(num / 2, none, true); ASSERT_EQ(0, result_pair.first.size()); ASSERT_FALSE(result_pair.second); - result_pair = this->map_.find_first(NoLimit, none); + result_pair = this->map_.find_first(NoLimit, none, true); ASSERT_EQ(0, result_pair.first.size()); ASSERT_FALSE(result_pair.second); } diff --git a/internal/core/unittest/test_offset_ordered_map.cpp b/internal/core/unittest/test_offset_ordered_map.cpp index 0bc1913074..36f4bafc83 100644 --- a/internal/core/unittest/test_offset_ordered_map.cpp +++ b/internal/core/unittest/test_offset_ordered_map.cpp @@ -62,7 +62,8 @@ TYPED_TEST_SUITE_P(TypedOffsetOrderedMapTest); TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { // no data. { - auto [offsets, has_more_res] = this->map_.find_first(Unlimited, {}); + auto [offsets, has_more_res] = + this->map_.find_first(Unlimited, {}, true); ASSERT_EQ(0, offsets.size()); ASSERT_FALSE(has_more_res); } @@ -75,10 +76,11 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { // all is satisfied. BitsetType all(num); - all.reset(); + all.set(); { - auto [offsets, has_more_res] = this->map_.find_first(num / 2, all); + auto [offsets, has_more_res] = + this->map_.find_first(num / 2, all, true); ASSERT_EQ(num / 2, offsets.size()); ASSERT_TRUE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -86,7 +88,8 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { } } { - auto [offsets, has_more_res] = this->map_.find_first(Unlimited, all); + auto [offsets, has_more_res] = + this->map_.find_first(Unlimited, all, true); ASSERT_EQ(num, offsets.size()); ASSERT_FALSE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -96,10 +99,10 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { // corner case, segment offset exceeds the size of bitset. BitsetType all_minus_1(num - 1); - all_minus_1.reset(); + all_minus_1.set(); { auto [offsets, has_more_res] = - this->map_.find_first(num / 2, all_minus_1); + this->map_.find_first(num / 2, all_minus_1, true); ASSERT_EQ(num / 2, offsets.size()); ASSERT_TRUE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -108,7 +111,7 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { } { auto [offsets, has_more_res] = - this->map_.find_first(Unlimited, all_minus_1); + this->map_.find_first(Unlimited, all_minus_1, true); ASSERT_EQ(all_minus_1.size(), offsets.size()); ASSERT_FALSE(has_more_res); for (int i = 1; i < offsets.size(); i++) { @@ -118,14 +121,16 @@ TYPED_TEST_P(TypedOffsetOrderedMapTest, find_first) { // none is satisfied. BitsetType none(num); - none.set(); + none.reset(); { - auto [offsets, has_more_res] = this->map_.find_first(num / 2, none); + auto [offsets, has_more_res] = + this->map_.find_first(num / 2, none, true); ASSERT_TRUE(has_more_res); ASSERT_EQ(0, offsets.size()); } { - auto [offsets, has_more_res] = this->map_.find_first(NoLimit, none); + auto [offsets, has_more_res] = + this->map_.find_first(NoLimit, none, true); ASSERT_TRUE(has_more_res); ASSERT_EQ(0, offsets.size()); }