From 5f2f4eb3d6728538a9842eb7bc8d44b8d4fe2320 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 1 Aug 2025 10:15:36 +0800 Subject: [PATCH] enhance: Ignore entry with same ts when DeleteRecord search pks (#43669) Related to #43660 This patch reduces the unwanted offset&ts entries having same timestamp of delete record. Under large amount of upsert, this false hit could increase large amount of memory usage while applying delete. The next step could be passing a callback to `search_pk_func_` to handle hit entry streamingly. Signed-off-by: Congqi Xia --- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 23 ++++++++++++++----- .../src/segcore/ChunkedSegmentSealedImpl.h | 3 ++- internal/core/src/segcore/InsertRecord.h | 12 ++++++++-- .../core/src/segcore/SegmentGrowingImpl.cpp | 6 +++-- .../core/src/segcore/SegmentGrowingImpl.h | 5 ++-- 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index a1d46863aa..03cec48e5b 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -898,12 +898,14 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk, std::vector> ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const { + const Timestamp* timestamps, + bool include_same_ts) const { std::vector> pk_offsets; // handle unsorted case if (!is_sorted_by_pk_) { for (size_t i = 0; i < pks.size(); i++) { - auto offsets = insert_record_.search_pk(pks[i], timestamps[i]); + auto offsets = insert_record_.search_pk( + pks[i], timestamps[i], include_same_ts); for (auto offset : offsets) { pk_offsets.emplace_back(offset, timestamps[i]); } @@ -917,6 +919,13 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, auto all_chunk_pins = pk_column->GetAllChunks(); + auto timestamp_hit = include_same_ts + ? [](const Timestamp& ts1, + const Timestamp& ts2) { return ts1 <= ts2; } + : [](const Timestamp& ts1, const Timestamp& ts2) { + return ts1 < ts2; + }; + switch (schema_->get_fields().at(pk_field_id).get_data_type()) { case DataType::INT64: { auto num_chunk = pk_column->num_chunks(); @@ -940,7 +949,8 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, pk_column->GetNumRowsUntilChunk(i); for (; it != src + chunk_row_num && *it == target; ++it) { auto offset = it - src + num_rows_until_chunk; - if (insert_record_.timestamps_[offset] <= timestamp) { + if (timestamp_hit(insert_record_.timestamps_[offset], + timestamp)) { pk_offsets.emplace_back(offset, timestamp); } } @@ -965,8 +975,9 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, string_chunk->operator[](offset) == target; ++offset) { auto segment_offset = offset + num_rows_until_chunk; - if (insert_record_.timestamps_[segment_offset] <= - timestamp) { + if (timestamp_hit( + insert_record_.timestamps_[segment_offset], + timestamp)) { pk_offsets.emplace_back(segment_offset, timestamp); } } @@ -1111,7 +1122,7 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( deleted_record_( &insert_record_, [this](const std::vector& pks, const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps); + return this->search_batch_pks(pks, timestamps, false); }, segment_id) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index fd44ffb2c1..f411839d02 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -206,7 +206,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { std::vector> search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const; + const Timestamp* timestamps, + bool include_same_ts) const; public: int64_t diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index f51ff3cf4b..5f7433d5f5 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -321,12 +321,20 @@ struct InsertRecord { } std::vector - search_pk(const PkType& pk, Timestamp timestamp) const { + search_pk(const PkType& pk, + Timestamp timestamp, + bool include_same_ts = true) const { std::shared_lock lck(shared_mutex_); std::vector res_offsets; auto offset_iter = pk2offset_->find(pk); + auto timestamp_hit = + include_same_ts ? [](const Timestamp& ts1, + const Timestamp& ts2) { return ts1 <= ts2; } + : [](const Timestamp& ts1, const Timestamp& ts2) { + return ts1 < ts2; + }; for (auto offset : offset_iter) { - if (timestamps_[offset] <= timestamp) { + if (timestamp_hit(timestamps_[offset], timestamp)) { res_offsets.emplace_back(offset); } } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 1b02a5730d..7e25b189f1 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -667,11 +667,13 @@ SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const { std::vector> SegmentGrowingImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const { + const Timestamp* timestamps, + bool include_same_ts) const { std::vector> results; for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; - auto offsets = insert_record_.search_pk(pks[i], timestamp); + auto offsets = + insert_record_.search_pk(pks[i], timestamp, include_same_ts); for (auto offset : offsets) { results.emplace_back(offset, timestamp); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 6e3fbeb1b3..0fc2b1f475 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -177,7 +177,8 @@ class SegmentGrowingImpl : public SegmentGrowing { std::vector> search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const; + const Timestamp* timestamps, + bool include_same_ts) const; public: size_t @@ -297,7 +298,7 @@ class SegmentGrowingImpl : public SegmentGrowing { &insert_record_, [this](const std::vector& pks, const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps); + return this->search_batch_pks(pks, timestamps, false); }, segment_id) { this->CreateTextIndexes();