fix: fix delete consumer bug for cocurrency R-W (#43831)

#41570

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-08-12 11:37:42 +08:00 committed by GitHub
parent 3c73b5f1a1
commit b7c7df9440
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 172 additions and 26 deletions

View File

@ -11,6 +11,7 @@
#pragma once
#include <limits>
#include <memory>
#include <mutex>
#include <shared_mutex>
@ -168,7 +169,7 @@ class DeletedRecord {
// find last meeted snapshot
if (!snapshots_.empty()) {
int loc = snapshots_.size() - 1;
while (snapshots_[loc].first > query_timestamp && loc >= 0) {
while (loc >= 0 && snapshots_[loc].first > query_timestamp) {
loc--;
}
if (loc >= 0) {
@ -182,36 +183,15 @@ class DeletedRecord {
}
}
auto start_iter = hit_snapshot ? next_iter : accessor.begin();
auto end_iter =
accessor.lower_bound(std::make_pair(query_timestamp, 0));
auto it = hit_snapshot ? next_iter : accessor.begin();
auto it = start_iter;
// when end_iter point to skiplist end, concurrent delete may append new value
// after lower_bound() called, so end_iter is not logical valid.
if (end_iter == accessor.end()) {
while (it != accessor.end() && it->first <= query_timestamp) {
if (it->second < insert_barrier) {
bitset.set(it->second);
}
it++;
}
return;
}
while (it != accessor.end() && it != end_iter) {
if (it->second < insert_barrier) {
bitset.set(it->second);
}
it++;
}
while (it != accessor.end() && it->first == query_timestamp) {
while (it != accessor.end() && it->first <= query_timestamp) {
if (it->second < insert_barrier) {
bitset.set(it->second);
}
it++;
}
}
size_t
@ -233,7 +213,7 @@ class DeletedRecord {
DumpSnapshot() {
SortedDeleteList::Accessor accessor(deleted_lists_);
int total_size = accessor.size();
int dumped_size = snapshots_.empty() ? 0 : GetSnapshotBitsSize();
int dumped_size = dumped_entry_count_.load();
while (total_size - dumped_size > DUMP_BATCH_SIZE) {
int32_t bitsize = 0;
@ -278,6 +258,7 @@ class DeletedRecord {
snap_next_iter_.push_back(it);
}
dumped_entry_count_.store(dumped_size + DUMP_BATCH_SIZE);
LOG_INFO(
"dump delete record snapshot at ts: {}, cursor: {}, "
"total size:{} "
@ -343,6 +324,8 @@ class DeletedRecord {
std::vector<std::pair<Timestamp, BitsetType>> snapshots_;
// next delete record iterator that follows every snapshot
std::vector<SortedDeleteList::iterator> snap_next_iter_;
// total number of delete entries that have been incorporated into snapshots
std::atomic<int64_t> dumped_entry_count_{0};
};
} // namespace milvus::segcore

View File

@ -514,3 +514,166 @@ TEST(DeleteMVCC, perform) {
.count()
<< std::endl;
}
TEST(DeleteMVCC, QueryTimestampLowerThanFirstSnapshot) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 50000;
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(const SegOffset offset, const Timestamp ts)>
cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
cb(offset, timestamp);
}
}
},
0);
// insert (0,0), (1,1), ..., (N-1,N-1)
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = i;
tss[i] = i;
insert_record.insert_pk(age_data[i], i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// delete first DN pks with ts = i+1, ensure snapshots are created
auto DN = 40000;
std::vector<Timestamp> delete_ts(DN);
std::vector<PkType> delete_pk(DN);
for (int i = 0; i < DN; ++i) {
delete_pk[i] = age_data[i];
delete_ts[i] = i + 1; // 1..DN
}
delete_record.StreamPush(delete_pk, delete_ts.data());
auto snapshots = delete_record.get_snapshots();
std::cout << "snapshots size: " << snapshots.size() << std::endl;
ASSERT_GE(snapshots.size(), 1);
// Query at ts smaller than first snapshot ts, expect only first deletion visible
Timestamp query_timestamp = 1;
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
for (int i = 0; i < N; i++) {
bool expected = (i == 0);
ASSERT_EQ(bitsets_view[i], expected) << i;
}
}
TEST(DeleteMVCC, SnapshotDumpProgress) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
const int N = 21000;
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(const SegOffset offset, const Timestamp ts)>
cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
cb(offset, timestamp);
}
}
},
0);
// insert pk=i at ts=i
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = i;
tss[i] = i;
insert_record.insert_pk(age_data[i], i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// 1) Push exactly 10000 deletes -> no snapshot expected (threshold is > 10000)
const int B = 10000; // current DUMP_BATCH_SIZE
std::vector<Timestamp> delete_ts1(B);
std::vector<PkType> delete_pk1(B);
for (int i = 0; i < B; ++i) {
delete_pk1[i] = age_data[i];
delete_ts1[i] = i + 1;
}
delete_record.StreamPush(delete_pk1, delete_ts1.data());
auto snapshots = delete_record.get_snapshots();
ASSERT_EQ(0, snapshots.size());
// 2) Push one more delete -> one snapshot expected, covering first 10000
std::vector<Timestamp> delete_ts2(1);
std::vector<PkType> delete_pk2(1);
delete_pk2[0] = age_data[B];
delete_ts2[0] = B + 1; // 10001
delete_record.StreamPush(delete_pk2, delete_ts2.data());
snapshots = delete_record.get_snapshots();
ASSERT_EQ(1, snapshots.size());
ASSERT_EQ(10000, snapshots[0].second.count());
// 3) Push 9500 more -> still one snapshot
const int more1 = 9500;
std::vector<Timestamp> delete_ts3(more1);
std::vector<PkType> delete_pk3(more1);
for (int i = 0; i < more1; ++i) {
delete_pk3[i] = age_data[B + 1 + i];
delete_ts3[i] = B + 2 + i;
}
delete_record.StreamPush(delete_pk3, delete_ts3.data());
snapshots = delete_record.get_snapshots();
ASSERT_EQ(1, snapshots.size());
ASSERT_EQ(10000, snapshots[0].second.count());
// 4) Push 500 more (total 10001 after previous 10000 dumped) -> second snapshot appears
const int more2 = 500;
std::vector<Timestamp> delete_ts4(more2);
std::vector<PkType> delete_pk4(more2);
for (int i = 0; i < more2; ++i) {
delete_pk4[i] = age_data[B + 1 + more1 + i];
delete_ts4[i] = B + 2 + more1 + i;
}
delete_record.StreamPush(delete_pk4, delete_ts4.data());
snapshots = delete_record.get_snapshots();
ASSERT_EQ(2, snapshots.size());
ASSERT_EQ(20000, snapshots[1].second.count());
}