From 54d17bc5dac9e17da872b6c3ee496a8cf00b41ca Mon Sep 17 00:00:00 2001 From: xige-16 Date: Wed, 13 Jul 2022 10:22:26 +0800 Subject: [PATCH] Fix query too slow when insert multi repeated pk data (#18231) Signed-off-by: xige-16 --- internal/core/src/common/Types.h | 5 ++- internal/core/src/segcore/DeletedRecord.h | 16 +++++++- internal/core/src/segcore/InsertRecord.h | 24 ++++++------ .../core/src/segcore/SegmentGrowingImpl.cpp | 9 ++--- .../core/src/segcore/SegmentGrowingImpl.h | 5 +++ internal/core/src/segcore/SegmentInterface.h | 3 ++ .../core/src/segcore/SegmentSealedImpl.cpp | 9 ++--- internal/core/src/segcore/SegmentSealedImpl.h | 5 +++ internal/core/src/segcore/Utils.cpp | 38 +++++++++++-------- internal/querynode/task_query.go | 2 +- 10 files changed, 76 insertions(+), 40 deletions(-) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 1a8c4e3cda..d6120663ab 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -70,7 +71,9 @@ using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; using InsertData = proto::segcore::InsertRecord; using PkType = std::variant; -using Pk2OffsetType = tbb::concurrent_unordered_multimap>; +// tbb::concurrent_unordered_multimap equal_range too slow when multi repeated key +// using Pk2OffsetType = tbb::concurrent_unordered_multimap>; +using Pk2OffsetType = tbb::concurrent_unordered_map, std::hash>; inline bool IsPrimaryKeyDataType(DataType data_type) { diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index d0f9e26816..bfb64d119a 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -42,6 +42,21 @@ struct DeletedRecord { return lru_; } + std::shared_ptr + clone_lru_entry(int64_t insert_barrier, int64_t del_barrier, int64_t& old_del_barrier, bool& hit_cache) { + std::shared_lock lck(shared_mutex_); + auto res = lru_->clone(insert_barrier); + old_del_barrier = lru_->del_barrier; + + if (lru_->bitmap_ptr->size() == insert_barrier && lru_->del_barrier == del_barrier) { + hit_cache = true; + } else { + res->del_barrier = del_barrier; + } + + return res; + } + void insert_lru_entry(std::shared_ptr new_entry, bool force = false) { std::lock_guard lck(shared_mutex_); @@ -59,7 +74,6 @@ struct DeletedRecord { AckResponder ack_responder_; ConcurrentVector timestamps_; ConcurrentVector pks_; - int64_t record_size_ = 0; private: std::shared_ptr lru_; diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index a227833d77..2577b41ece 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -43,11 +43,12 @@ struct InsertRecord { std::vector search_pk(const PkType pk, Timestamp timestamp) const { std::vector res_offsets; - auto [iter_b, iter_e] = pk2offset_.equal_range(pk); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto offset = SegOffset(iter->second); - if (timestamps_[offset.get()] <= timestamp) { - res_offsets.push_back(offset); + auto offset_iter = pk2offset_.find(pk); + if (offset_iter != pk2offset_.end()) { + for (auto offset : offset_iter->second) { + if (timestamps_[offset] <= timestamp) { + res_offsets.push_back(SegOffset(offset)); + } } } @@ -57,11 +58,12 @@ struct InsertRecord { std::vector search_pk(const PkType pk, int64_t insert_barrier) const { std::vector res_offsets; - auto [iter_b, iter_e] = pk2offset_.equal_range(pk); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto offset = SegOffset(iter->second); - if (offset.get() < insert_barrier) { - res_offsets.push_back(offset); + auto offset_iter = pk2offset_.find(pk); + if (offset_iter != pk2offset_.end()) { + for (auto offset : offset_iter->second) { + if (offset < insert_barrier) { + res_offsets.push_back(SegOffset(offset)); + } } } @@ -70,7 +72,7 @@ struct InsertRecord { void insert_pk(const PkType pk, int64_t offset) { - pk2offset_.insert(std::make_pair(pk, offset)); + pk2offset_[pk].insert(offset); } bool diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 74888f7e13..f677fb56e4 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -157,11 +157,10 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { auto timestamps = reinterpret_cast(info.timestamps); // step 2: fill pks and timestamps - deleted_record_.pks_.set_data_raw(0, pks.data(), size); - deleted_record_.timestamps_.set_data_raw(0, timestamps, size); - deleted_record_.ack_responder_.AddSegment(0, size); - deleted_record_.reserved.fetch_add(size); - deleted_record_.record_size_ = size; + auto reserved_begin = deleted_record_.reserved.fetch_add(size); + deleted_record_.pks_.set_data_raw(reserved_begin, pks.data(), size); + deleted_record_.timestamps_.set_data_raw(reserved_begin, timestamps, size); + deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); } SpanBase diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 83d3486ce9..33550672b2 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -64,6 +64,11 @@ class SegmentGrowingImpl : public SegmentGrowing { std::string debug() const override; + int64_t + get_segment_id() const override { + return id_; + } + public: const InsertRecord& get_insert_record() const { diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index f17da8560d..00f8ad56d6 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -69,6 +69,9 @@ class SegmentInterface { virtual void LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0; + + virtual int64_t + get_segment_id() const = 0; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 2d09cb8c34..ea8a93bbd9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -254,11 +254,10 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { auto timestamps = reinterpret_cast(info.timestamps); // step 2: fill pks and timestamps - deleted_record_.pks_.set_data_raw(0, pks.data(), size); - deleted_record_.timestamps_.set_data_raw(0, timestamps, size); - deleted_record_.ack_responder_.AddSegment(0, size); - deleted_record_.reserved.fetch_add(size); - deleted_record_.record_size_ = size; + auto reserved_begin = deleted_record_.reserved.fetch_add(size); + deleted_record_.pks_.set_data_raw(reserved_begin, pks.data(), size); + deleted_record_.timestamps_.set_data_raw(reserved_begin, timestamps, size); + deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); } // internal API: support scalar index only diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 438b415fe8..1fc4d0d59b 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -50,6 +50,11 @@ class SegmentSealedImpl : public SegmentSealed { bool HasFieldData(FieldId field_id) const override; + int64_t + get_segment_id() const override { + return id_; + } + public: int64_t GetMemoryUsageInBytes() const override; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 560233a8d8..d0ee78015a 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -380,37 +380,43 @@ get_deleted_bitmap(int64_t del_barrier, DeletedRecord& delete_record, const InsertRecord& insert_record, Timestamp query_timestamp) { - auto old = delete_record.get_lru_entry(); // if insert_barrier and del_barrier have not changed, use cache data directly - if (old->bitmap_ptr->size() == insert_barrier) { - if (old->del_barrier == del_barrier) { - return old; - } + bool hit_cache = false; + int64_t old_del_barrier = 0; + auto current = delete_record.clone_lru_entry(insert_barrier, del_barrier, old_del_barrier, hit_cache); + if (hit_cache) { + return current; } - auto current = old->clone(insert_barrier); - current->del_barrier = del_barrier; - auto bitmap = current->bitmap_ptr; int64_t start, end; - if (del_barrier < old->del_barrier) { + if (del_barrier < old_del_barrier) { // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp // so these deletion records do not take effect in query/search // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0 // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] start = del_barrier; - end = old->del_barrier; + end = old_del_barrier; } else { // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] - start = old->del_barrier; + start = old_del_barrier; end = del_barrier; } + + // Avoid invalid calculations when there are a lot of repeated delete pks + std::unordered_map delete_timestamps; for (auto del_index = start; del_index < end; ++del_index) { - // get pk in delete logs auto pk = delete_record.pks_[del_index]; - // find insert data which has same pk + auto timestamp = delete_record.timestamps_[del_index]; + + delete_timestamps[pk] = timestamp > delete_timestamps[pk] ? timestamp : delete_timestamps[pk]; + } + + for (auto iter = delete_timestamps.begin(); iter != delete_timestamps.end(); iter++) { + auto pk = iter->first; + auto delete_timestamp = iter->second; auto segOffsets = insert_record.search_pk(pk, insert_barrier); for (auto offset : segOffsets) { int64_t insert_row_offset = offset.get(); @@ -419,22 +425,22 @@ get_deleted_bitmap(int64_t del_barrier, // insert after delete with same pk, delete will not task effect on this insert record // and reset bitmap to 0 - if (insert_record.timestamps_[insert_row_offset] > delete_record.timestamps_[del_index]) { + if (insert_record.timestamps_[insert_row_offset] > delete_timestamp) { bitmap->reset(insert_row_offset); continue; } // the deletion record do not take effect in search/query // and reset bitmap to 0 - if (delete_record.timestamps_[del_index] > query_timestamp) { + if (delete_timestamp > query_timestamp) { bitmap->reset(insert_row_offset); continue; } - // insert data corresponding to the insert_row_offset will be ignored in search/query bitmap->set(insert_row_offset); } } + delete_record.insert_lru_entry(current); return current; } diff --git a/internal/querynode/task_query.go b/internal/querynode/task_query.go index 22f9c77577..07ae85f73d 100644 --- a/internal/querynode/task_query.go +++ b/internal/querynode/task_query.go @@ -54,7 +54,7 @@ func (q *queryTask) PreExecute(ctx context.Context) error { func (q *queryTask) queryOnStreaming() error { // check ctx timeout if !funcutil.CheckCtxValid(q.Ctx()) { - return errors.New("search context timeout") + return errors.New("query context timeout") } // check if collection has been released, check streaming since it's released first