From b7c7df9440cfc03d7c38f525539efddc360393fe Mon Sep 17 00:00:00 2001 From: zhagnlu <1542303831@qq.com> Date: Tue, 12 Aug 2025 11:37:42 +0800 Subject: [PATCH] fix: fix delete consumer bug for cocurrency R-W (#43831) #41570 Signed-off-by: luzhang Co-authored-by: luzhang --- internal/core/src/segcore/DeletedRecord.h | 35 +--- internal/core/unittest/test_delete_record.cpp | 163 ++++++++++++++++++ 2 files changed, 172 insertions(+), 26 deletions(-) diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 260ff62858..aeee4b3b59 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -11,6 +11,7 @@ #pragma once +#include #include #include #include @@ -168,7 +169,7 @@ class DeletedRecord { // find last meeted snapshot if (!snapshots_.empty()) { int loc = snapshots_.size() - 1; - while (snapshots_[loc].first > query_timestamp && loc >= 0) { + while (loc >= 0 && snapshots_[loc].first > query_timestamp) { loc--; } if (loc >= 0) { @@ -182,36 +183,15 @@ class DeletedRecord { } } - auto start_iter = hit_snapshot ? next_iter : accessor.begin(); - auto end_iter = - accessor.lower_bound(std::make_pair(query_timestamp, 0)); + auto it = hit_snapshot ? next_iter : accessor.begin(); - auto it = start_iter; - - // when end_iter point to skiplist end, concurrent delete may append new value - // after lower_bound() called, so end_iter is not logical valid. - if (end_iter == accessor.end()) { - while (it != accessor.end() && it->first <= query_timestamp) { - if (it->second < insert_barrier) { - bitset.set(it->second); - } - it++; - } - return; - } - - while (it != accessor.end() && it != end_iter) { - if (it->second < insert_barrier) { - bitset.set(it->second); - } - it++; - } - while (it != accessor.end() && it->first == query_timestamp) { + while (it != accessor.end() && it->first <= query_timestamp) { if (it->second < insert_barrier) { bitset.set(it->second); } it++; } + } size_t @@ -233,7 +213,7 @@ class DeletedRecord { DumpSnapshot() { SortedDeleteList::Accessor accessor(deleted_lists_); int total_size = accessor.size(); - int dumped_size = snapshots_.empty() ? 0 : GetSnapshotBitsSize(); + int dumped_size = dumped_entry_count_.load(); while (total_size - dumped_size > DUMP_BATCH_SIZE) { int32_t bitsize = 0; @@ -278,6 +258,7 @@ class DeletedRecord { snap_next_iter_.push_back(it); } + dumped_entry_count_.store(dumped_size + DUMP_BATCH_SIZE); LOG_INFO( "dump delete record snapshot at ts: {}, cursor: {}, " "total size:{} " @@ -343,6 +324,8 @@ class DeletedRecord { std::vector> snapshots_; // next delete record iterator that follows every snapshot std::vector snap_next_iter_; + // total number of delete entries that have been incorporated into snapshots + std::atomic dumped_entry_count_{0}; }; } // namespace milvus::segcore diff --git a/internal/core/unittest/test_delete_record.cpp b/internal/core/unittest/test_delete_record.cpp index 71c20466d0..0b5b08a360 100644 --- a/internal/core/unittest/test_delete_record.cpp +++ b/internal/core/unittest/test_delete_record.cpp @@ -514,3 +514,166 @@ TEST(DeleteMVCC, perform) { .count() << std::endl; } + +TEST(DeleteMVCC, QueryTimestampLowerThanFirstSnapshot) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 50000; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record( + &insert_record, + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function + cb) { + for (size_t i = 0; i < pks.size(); ++i) { + auto timestamp = timestamps[i]; + auto offsets = insert_record.search_pk(pks[i], timestamp); + for (auto offset : offsets) { + cb(offset, timestamp); + } + } + }, + 0); + + // insert (0,0), (1,1), ..., (N-1,N-1) + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + // delete first DN pks with ts = i+1, ensure snapshots are created + auto DN = 40000; + std::vector delete_ts(DN); + std::vector delete_pk(DN); + for (int i = 0; i < DN; ++i) { + delete_pk[i] = age_data[i]; + delete_ts[i] = i + 1; // 1..DN + } + delete_record.StreamPush(delete_pk, delete_ts.data()); + + auto snapshots = delete_record.get_snapshots(); + std::cout << "snapshots size: " << snapshots.size() << std::endl; + ASSERT_GE(snapshots.size(), 1); + + // Query at ts smaller than first snapshot ts, expect only first deletion visible + Timestamp query_timestamp = 1; + BitsetType bitsets(N); + BitsetTypeView bitsets_view(bitsets); + int64_t insert_barrier = N; + delete_record.Query(bitsets_view, insert_barrier, query_timestamp); + + for (int i = 0; i < N; i++) { + bool expected = (i == 0); + ASSERT_EQ(bitsets_view[i], expected) << i; + } +} + +TEST(DeleteMVCC, SnapshotDumpProgress) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + + const int N = 21000; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record( + &insert_record, + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function + cb) { + for (size_t i = 0; i < pks.size(); ++i) { + auto timestamp = timestamps[i]; + auto offsets = insert_record.search_pk(pks[i], timestamp); + for (auto offset : offsets) { + cb(offset, timestamp); + } + } + }, + 0); + + // insert pk=i at ts=i + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + // 1) Push exactly 10000 deletes -> no snapshot expected (threshold is > 10000) + const int B = 10000; // current DUMP_BATCH_SIZE + std::vector delete_ts1(B); + std::vector delete_pk1(B); + for (int i = 0; i < B; ++i) { + delete_pk1[i] = age_data[i]; + delete_ts1[i] = i + 1; + } + delete_record.StreamPush(delete_pk1, delete_ts1.data()); + auto snapshots = delete_record.get_snapshots(); + ASSERT_EQ(0, snapshots.size()); + + // 2) Push one more delete -> one snapshot expected, covering first 10000 + std::vector delete_ts2(1); + std::vector delete_pk2(1); + delete_pk2[0] = age_data[B]; + delete_ts2[0] = B + 1; // 10001 + delete_record.StreamPush(delete_pk2, delete_ts2.data()); + snapshots = delete_record.get_snapshots(); + ASSERT_EQ(1, snapshots.size()); + ASSERT_EQ(10000, snapshots[0].second.count()); + + // 3) Push 9500 more -> still one snapshot + const int more1 = 9500; + std::vector delete_ts3(more1); + std::vector delete_pk3(more1); + for (int i = 0; i < more1; ++i) { + delete_pk3[i] = age_data[B + 1 + i]; + delete_ts3[i] = B + 2 + i; + } + delete_record.StreamPush(delete_pk3, delete_ts3.data()); + snapshots = delete_record.get_snapshots(); + ASSERT_EQ(1, snapshots.size()); + ASSERT_EQ(10000, snapshots[0].second.count()); + + // 4) Push 500 more (total 10001 after previous 10000 dumped) -> second snapshot appears + const int more2 = 500; + std::vector delete_ts4(more2); + std::vector delete_pk4(more2); + for (int i = 0; i < more2; ++i) { + delete_pk4[i] = age_data[B + 1 + more1 + i]; + delete_ts4[i] = B + 2 + more1 + i; + } + delete_record.StreamPush(delete_pk4, delete_ts4.data()); + snapshots = delete_record.get_snapshots(); + ASSERT_EQ(2, snapshots.size()); + ASSERT_EQ(20000, snapshots[1].second.count()); +}