enhance: refactor delete mvcc function (#38066)

#37413

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2024-12-15 18:02:43 +08:00 committed by GitHub
parent 6ea15265e1
commit 01de0afc4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 802 additions and 409 deletions

View File

@ -209,6 +209,15 @@ Join(const std::vector<T>& items, const std::string& delimiter) {
return ss.str();
}
inline std::string
PrintBitsetTypeView(const BitsetTypeView& view) {
std::stringstream ss;
for (auto i = 0; i < view.size(); ++i) {
ss << int(view[i]);
}
return ss.str();
}
inline std::string
GetCommonPrefix(const std::string& str1, const std::string& str2) {
size_t len = std::min(str1.length(), str2.length());

View File

@ -669,38 +669,8 @@ ChunkedSegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
// all record filtered
if (size == 0) {
return;
}
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);
for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}
void
@ -876,35 +846,7 @@ void
ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}
void
@ -1355,7 +1297,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
is_sorted_by_pk_(is_sorted_by_pk) {
is_sorted_by_pk_(is_sorted_by_pk),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
@ -1992,7 +1935,7 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

View File

@ -104,6 +104,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return stats_.mem_size.load() + deleted_record_.mem_size();
}
InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}
int64_t
get_row_count() const override;
@ -298,6 +303,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}
void
@ -322,11 +328,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
@ -367,7 +368,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;
LoadFieldDataInfo field_data_info_;

View File

@ -17,111 +17,282 @@
#include <tuple>
#include <utility>
#include <vector>
#include <folly/ConcurrentSkipList.h>
#include "AckResponder.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "segcore/Record.h"
#include "segcore/InsertRecord.h"
#include "segcore/SegmentInterface.h"
#include "ConcurrentVector.h"
namespace milvus::segcore {
struct DeletedRecord {
struct TmpBitmap {
// Just for query
int64_t del_barrier = 0;
BitsetTypePtr bitmap_ptr;
using Offset = int32_t;
std::shared_ptr<TmpBitmap>
clone(int64_t capacity);
};
static constexpr int64_t deprecated_size_per_chunk = 32 * 1024;
DeletedRecord()
: lru_(std::make_shared<TmpBitmap>()),
timestamps_(deprecated_size_per_chunk),
pks_(deprecated_size_per_chunk) {
lru_->bitmap_ptr = std::make_shared<BitsetType>();
}
auto
get_lru_entry() {
std::shared_lock lck(shared_mutex_);
return lru_;
}
std::shared_ptr<TmpBitmap>
clone_lru_entry(int64_t insert_barrier,
int64_t del_barrier,
int64_t& old_del_barrier,
bool& hit_cache) {
std::shared_lock lck(shared_mutex_);
auto res = lru_->clone(insert_barrier);
old_del_barrier = lru_->del_barrier;
if (lru_->bitmap_ptr->size() == insert_barrier &&
lru_->del_barrier == del_barrier) {
hit_cache = true;
} else {
res->del_barrier = del_barrier;
struct Comparator {
bool
operator()(const std::pair<Timestamp, Offset>& left,
const std::pair<Timestamp, Offset>& right) const {
if (left.first == right.first) {
return left.second < right.second;
}
return res;
return left.first < right.first;
}
};
// a lock-free list for multi-thread insert && read
using SortedDeleteList =
folly::ConcurrentSkipList<std::pair<Timestamp, Offset>, Comparator>;
static int32_t DUMP_BATCH_SIZE = 100000;
static int32_t DELETE_PAIR_SIZE = sizeof(std::pair<Timestamp, Offset>);
template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record,
SegmentInternalInterface* segment)
: insert_record_(insert_record),
segment_(segment),
deleted_lists_(SortedDeleteList::createInstance()) {
}
// not binding segment, only for testing purposes
DeletedRecord(InsertRecord<is_sealed>* insert_record)
: insert_record_(insert_record),
segment_(nullptr),
deleted_lists_(SortedDeleteList::createInstance()) {
}
~DeletedRecord() {
}
DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
DeletedRecord<is_sealed>&
operator=(DeletedRecord<is_sealed>&& delete_record) = delete;
void
insert_lru_entry(std::shared_ptr<TmpBitmap> new_entry, bool force = false) {
std::lock_guard lck(shared_mutex_);
if (new_entry->del_barrier <= lru_->del_barrier) {
if (!force ||
new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) {
// DO NOTHING
return;
}
}
lru_ = std::move(new_entry);
}
void
push(const std::vector<PkType>& pks, const Timestamp* timestamps) {
std::lock_guard lck(buffer_mutex_);
auto size = pks.size();
ssize_t divide_point = 0;
auto n = n_.load();
// Truncate the overlapping prefix
if (n > 0) {
auto last = timestamps_[n - 1];
divide_point =
std::lower_bound(timestamps, timestamps + size, last + 1) -
timestamps;
}
// All these delete records have been applied
if (divide_point == size) {
LoadPush(const std::vector<PkType>& pks, const Timestamp* timestamps) {
if (pks.empty()) {
return;
}
size -= divide_point;
pks_.set_data_raw(n, pks.data() + divide_point, size);
timestamps_.set_data_raw(n, timestamps + divide_point, size);
n_ += size;
mem_size_ += sizeof(Timestamp) * size +
CalcPksSize(pks.data() + divide_point, size);
InternalPush(pks, timestamps);
SortedDeleteList::Accessor accessor(deleted_lists_);
auto* last = accessor.last();
Assert(last != nullptr);
max_load_timestamp_ = last->first;
//TODO: add support for dump snapshot when load finished
}
const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
// stream push delete timestamps should be sorted outside of the interface
// considering concurrent query and push
void
StreamPush(const std::vector<PkType>& pks, const Timestamp* timestamps) {
if (pks.empty()) {
return;
}
InternalPush(pks, timestamps);
bool can_dump = timestamps[0] >= max_load_timestamp_;
if (can_dump) {
DumpSnapshot();
}
}
const ConcurrentVector<PkType>&
pks() const {
return pks_;
void
InternalPush(const std::vector<PkType>& pks, const Timestamp* timestamps) {
int64_t removed_num = 0;
int64_t mem_add = 0;
SortedDeleteList::Accessor accessor(deleted_lists_);
for (size_t i = 0; i < pks.size(); ++i) {
auto deleted_pk = pks[i];
auto deleted_ts = timestamps[i];
std::vector<SegOffset> offsets;
if (segment_) {
offsets =
std::move(segment_->search_pk(deleted_pk, deleted_ts));
} else {
// only for testing
offsets = std::move(
insert_record_->search_pk(deleted_pk, deleted_ts));
}
for (auto& offset : offsets) {
auto row_id = offset.get();
// if alreay deleted, no need to add new record
if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) {
continue;
}
// if insert record and delete record is same timestamp,
// delete not take effect on this record.
if (deleted_ts == insert_record_->timestamps_[row_id]) {
continue;
}
accessor.insert(std::make_pair(deleted_ts, row_id));
if constexpr (is_sealed) {
Assert(deleted_mask_.size() > 0);
deleted_mask_.set(row_id);
} else {
// need to add mask size firstly for growing segment
deleted_mask_.resize(insert_record_->size());
deleted_mask_.set(row_id);
}
removed_num++;
mem_add += DELETE_PAIR_SIZE;
}
}
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
}
void
Query(BitsetTypeView& bitset,
int64_t insert_barrier,
Timestamp query_timestamp) {
Assert(bitset.size() == insert_barrier);
SortedDeleteList::Accessor accessor(deleted_lists_);
if (accessor.size() == 0) {
return;
}
// try use snapshot to skip iterations
bool hit_snapshot = false;
SortedDeleteList::iterator next_iter;
if (!snapshots_.empty()) {
int loc = snapshots_.size() - 1;
// find last meeted snapshot
{
std::shared_lock<std::shared_mutex> lock(snap_lock_);
while (snapshots_[loc].first > query_timestamp && loc >= 0) {
loc--;
}
if (loc >= 0) {
next_iter = snap_next_iter_[loc];
Assert(snapshots_[loc].second.size() <= bitset.size());
bitset.inplace_or_with_count(snapshots_[loc].second,
snapshots_[loc].second.size());
hit_snapshot = true;
}
}
}
auto start_iter = hit_snapshot ? next_iter : accessor.begin();
auto end_iter =
accessor.lower_bound(std::make_pair(query_timestamp, 0));
auto it = start_iter;
while (it != accessor.end() && it != end_iter) {
AssertInfo(it->second <= insert_barrier,
"delete record beyond insert barrier, {} : {}",
it->second,
insert_barrier);
bitset.set(it->second);
it++;
}
while (it != accessor.end() && it->first == query_timestamp) {
AssertInfo(it->second <= insert_barrier,
"delete record beyond insert barrier, {} : {}",
it->second,
insert_barrier);
bitset.set(it->second);
it++;
}
}
size_t
GetSnapshotBitsSize() const {
auto all_dump_bits = 0;
auto next_dump_ts = 0;
std::shared_lock<std::shared_mutex> lock(snap_lock_);
int loc = snapshots_.size() - 1;
while (loc >= 0) {
if (next_dump_ts != snapshots_[loc].first) {
all_dump_bits += snapshots_[loc].second.size();
}
loc--;
}
return all_dump_bits;
}
void
DumpSnapshot() {
SortedDeleteList::Accessor accessor(deleted_lists_);
int total_size = accessor.size();
int dumped_size = snapshots_.empty() ? 0 : GetSnapshotBitsSize();
while (total_size - dumped_size > DUMP_BATCH_SIZE) {
int32_t bitsize = 0;
if constexpr (is_sealed) {
bitsize = sealed_row_count_;
} else {
bitsize = insert_record_->size();
}
BitsetType bitmap(bitsize, false);
auto it = accessor.begin();
Timestamp last_dump_ts = 0;
if (!snapshots_.empty()) {
it = snap_next_iter_.back();
last_dump_ts = snapshots_.back().first;
bitmap.inplace_or_with_count(snapshots_.back().second,
snapshots_.back().second.size());
}
while (total_size - dumped_size > DUMP_BATCH_SIZE &&
it != accessor.end()) {
Timestamp dump_ts = 0;
for (auto size = 0; size < DUMP_BATCH_SIZE; ++it, ++size) {
bitmap.set(it->second);
if (size == DUMP_BATCH_SIZE - 1) {
dump_ts = it->first;
}
}
{
std::unique_lock<std::shared_mutex> lock(snap_lock_);
if (dump_ts == last_dump_ts) {
// only update
snapshots_.back().second = std::move(bitmap.clone());
snap_next_iter_.back() = it;
} else {
// add new snapshot
snapshots_.push_back(
std::make_pair(dump_ts, std::move(bitmap.clone())));
Assert(it != accessor.end() && it.good());
snap_next_iter_.push_back(it);
}
LOG_INFO(
"dump delete record snapshot at ts: {}, cursor: {}, "
"total size:{} "
"current snapshot size: {} for segment: {}",
dump_ts,
dumped_size + DUMP_BATCH_SIZE,
total_size,
snapshots_.size(),
segment_ ? segment_->get_segment_id() : 0);
last_dump_ts = dump_ts;
}
dumped_size += DUMP_BATCH_SIZE;
}
}
}
int64_t
size() const {
return n_.load();
SortedDeleteList::Accessor accessor(deleted_lists_);
return accessor.size();
}
size_t
@ -129,27 +300,39 @@ struct DeletedRecord {
return mem_size_.load();
}
private:
std::shared_ptr<TmpBitmap> lru_;
std::shared_mutex shared_mutex_;
void
set_sealed_row_count(size_t row_count) {
sealed_row_count_ = row_count;
deleted_mask_.resize(row_count);
}
std::shared_mutex buffer_mutex_;
std::vector<std::pair<Timestamp, BitsetType>>
get_snapshots() const {
std::shared_lock<std::shared_mutex> lock(snap_lock_);
std::vector<std::pair<Timestamp, BitsetType>> snapshots;
for (const auto& snap : snapshots_) {
snapshots.emplace_back(snap.first, snap.second.clone());
}
return std::move(snapshots);
}
public:
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
ConcurrentVector<Timestamp> timestamps_;
ConcurrentVector<PkType> pks_;
InsertRecord<is_sealed>* insert_record_;
SegmentInternalInterface* segment_;
std::shared_ptr<SortedDeleteList> deleted_lists_;
// max timestamp of deleted records which replayed in load process
Timestamp max_load_timestamp_{0};
int32_t sealed_row_count_;
// used to remove duplicated deleted records for fast access
BitsetType deleted_mask_;
// dump snapshot low frequency
mutable std::shared_mutex snap_lock_;
std::vector<std::pair<Timestamp, BitsetType>> snapshots_;
// next delete record iterator that follows every snapshot
std::vector<SortedDeleteList::iterator> snap_next_iter_;
};
inline auto
DeletedRecord::TmpBitmap::clone(int64_t capacity)
-> std::shared_ptr<TmpBitmap> {
auto res = std::make_shared<TmpBitmap>();
res->del_barrier = this->del_barrier;
// res->bitmap_ptr = std::make_shared<BitsetType>();
// *(res->bitmap_ptr) = *(this->bitmap_ptr);
res->bitmap_ptr = std::make_shared<BitsetType>(this->bitmap_ptr->clone());
res->bitmap_ptr->resize(capacity, false);
return res;
}
} // namespace milvus::segcore

View File

@ -46,23 +46,7 @@ void
SegmentGrowingImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}
void
@ -344,7 +328,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
}
// step 2: fill delete record
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}
@ -364,38 +348,8 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
// all record filtered
if (size == 0) {
return;
}
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);
for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}
SpanBase

View File

@ -95,11 +95,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
return indexing_record_;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
std::shared_mutex&
get_chunk_mutex() const {
return chunk_mutex_;
@ -254,7 +249,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
insert_record_(
*schema_, segcore_config.get_chunk_rows(), mmap_descriptor_),
indexing_record_(*schema_, index_meta_, segcore_config_),
id_(segment_id) {
id_(segment_id),
deleted_record_(&insert_record_, this) {
if (mmap_descriptor_ != nullptr) {
LOG_INFO("growing segment {} use mmap to hold raw data",
this->get_segment_id());
@ -334,6 +330,16 @@ class SegmentGrowingImpl : public SegmentGrowing {
return false;
}
std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const override {
return insert_record_.search_pk(pk, timestamp);
}
std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const override {
return insert_record_.search_pk(pk, insert_barrier);
}
protected:
int64_t
num_chunk(FieldId field_id) const override;
@ -395,7 +401,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable std::shared_mutex chunk_mutex_;
// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<false> deleted_record_;
int64_t id_;

View File

@ -19,7 +19,6 @@
#include <vector>
#include <index/ScalarIndex.h>
#include "DeletedRecord.h"
#include "FieldIndexing.h"
#include "common/Common.h"
#include "common/Schema.h"
@ -471,6 +470,12 @@ class SegmentInternalInterface : public SegmentInterface {
int64_t count,
const std::vector<std::string>& dynamic_field_names) const = 0;
virtual std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const = 0;
virtual std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const = 0;
protected:
mutable std::shared_mutex mutex_;
// fieldID -> std::pair<num_rows, avg_size>

View File

@ -52,6 +52,9 @@ class SegmentSealed : public SegmentInternalInterface {
LoadTextIndex(FieldId field_id,
std::unique_ptr<index::TextMatchIndex> index) = 0;
virtual InsertRecord<true>&
get_insert_record() = 0;
SegmentType
type() const override {
return SegmentType::Sealed;

View File

@ -667,38 +667,8 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}
if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}
// all record filtered
if (size == 0) {
return;
}
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);
for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}
void
@ -917,35 +887,7 @@ void
SegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();
auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}
void
@ -1269,7 +1211,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
is_sorted_by_pk_(is_sorted_by_pk) {
is_sorted_by_pk_(is_sorted_by_pk),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
@ -1735,11 +1678,7 @@ SegmentSealedImpl::search_ids(const IdArray& id_array,
for (auto& pk : pks) {
std::vector<SegOffset> pk_offsets;
if (!is_sorted_by_pk_) {
pk_offsets = insert_record_.search_pk(pk, timestamp);
} else {
pk_offsets = search_pk(pk, timestamp);
}
pk_offsets = search_pk(pk, timestamp);
for (auto offset : pk_offsets) {
switch (data_type) {
case DataType::INT64: {
@ -1839,7 +1778,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

View File

@ -139,6 +139,11 @@ class SegmentSealedImpl : public SegmentSealed {
return it->second->IsNullable();
};
InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}
public:
int64_t
num_chunk_index(FieldId field_id) const override;
@ -304,6 +309,7 @@ class SegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}
void
@ -328,11 +334,6 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}
const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;
@ -373,7 +374,7 @@ class SegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;
// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;
LoadFieldDataInfo field_data_info_;

View File

@ -108,82 +108,6 @@ std::unique_ptr<DataArray>
MergeDataArray(std::vector<MergeBase>& merge_bases,
const FieldMeta& field_meta);
template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(
int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp,
bool is_sorted_by_pk = false,
const std::function<std::vector<SegOffset>(const PkType&, int64_t)>&
search_fn = nullptr) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}
// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];
delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}
for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = is_sorted_by_pk
? search_fn(pk, insert_barrier)
: insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();
// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
delete_record.insert_lru_entry(current);
return current;
}
std::unique_ptr<DataArray>
ReverseDataFromIndex(const index::IndexBase* index,
const int64_t* seg_offsets,

View File

@ -41,6 +41,7 @@ set(MILVUS_TEST_FILES
test_c_tokenizer.cpp
test_loading.cpp
test_data_codec.cpp
test_delete_record.cpp
test_disk_file_manager_test.cpp
test_exec.cpp
test_expr.cpp

View File

@ -1204,7 +1204,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto collection = NewCollection(get_default_schema_config());
CSegmentInterface segment;
auto status = NewSegment(collection, Sealed, -1, &segment, true);
auto status = NewSegment(collection, Sealed, -1, &segment, false);
ASSERT_EQ(status.error_code, Success);
auto col = (milvus::segcore::Collection*)collection;
@ -1215,6 +1215,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
auto sealed_segment = dynamic_cast<SegmentSealed*>(segment_interface);
SealedLoadFieldData(dataset, *sealed_segment);
sealed_segment->get_insert_record().seal_pks();
// delete data pks = {1, 2, 3}, timestamps = {4, 4, 4}
std::vector<int64_t> delete_row_ids = {1, 2, 3};

View File

@ -0,0 +1,446 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <google/protobuf/text_format.h>
#include <gtest/gtest.h>
#include <array>
#include <boost/format.hpp>
#include <chrono>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <unordered_set>
#include "segcore/DeletedRecord.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/SegmentSealedImpl.h"
#include "segcore/SegmentGrowingImpl.h"
#include "test_utils/DataGen.h"
using namespace milvus;
using namespace milvus::segcore;
TEST(DeleteMVCC, common_case) {
auto schema = std::make_shared<Schema>();
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
ASSERT_EQ(0, segment->get_real_count());
// load insert: pk (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// with timestamp ts (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
int64_t c = 10;
auto dataset = DataGen(schema, c);
auto pks = dataset.get_col<int64_t>(pk);
SealedLoadFieldData(dataset, *segment);
ASSERT_EQ(c, segment->get_real_count());
auto& insert_record = segment->get_insert_record();
DeletedRecord<true> delete_record(&insert_record);
delete_record.set_sealed_row_count(c);
// delete pk(1) at ts(10);
std::vector<Timestamp> delete_ts = {10};
std::vector<PkType> delete_pk = {1};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(1, delete_record.size());
{
BitsetType bitsets(c);
BitsetTypeView bitsets_view(bitsets);
auto insert_barrier = c;
// query at ts (10)
Timestamp query_timestamp = 10;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0};
for (int i = 0; i < c; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
{
BitsetType bitsets(c);
BitsetTypeView bitsets_view(bitsets);
auto insert_barrier = c;
// query at ts (11)
Timestamp query_timestamp = 11;
// query at ts (11)
query_timestamp = 11;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0};
for (int i = 0; i < c; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete pk(5) at ts(12)
delete_ts = {12};
delete_pk = {5};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(2, delete_record.size());
{
BitsetType bitsets(c);
BitsetTypeView bitsets_view(bitsets);
auto insert_barrier = c;
// query at ts (12)
Timestamp query_timestamp = 12;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0};
for (int i = 0; i < c; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete at pk(1) at ts(13) again
delete_ts = {13};
delete_pk = {1};
delete_record.StreamPush(delete_pk, delete_ts.data());
// not add new record, because already deleted.
ASSERT_EQ(2, delete_record.size());
{
BitsetType bitsets(c);
BitsetTypeView bitsets_view(bitsets);
auto insert_barrier = c;
// query at ts (14)
Timestamp query_timestamp = 14;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0};
for (int i = 0; i < c; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete pk(9) at ts(9)
delete_ts = {9};
delete_pk = {9};
delete_record.StreamPush(delete_pk, delete_ts.data());
// not add new record, because insert also at ts(9) same as deleted
// delete not take effect.
ASSERT_EQ(2, delete_record.size());
{
BitsetType bitsets(c);
BitsetTypeView bitsets_view(bitsets);
auto insert_barrier = c;
// query at ts (14)
Timestamp query_timestamp = 14;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0};
for (int i = 0; i < c; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
}
TEST(DeleteMVCC, delete_exist_duplicate_pks) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 10;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord<false> delete_record(&insert_record);
// insert pk: (0, 1, 1, 2, 2, 3, 4, 3, 2, 5)
// at ts: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
std::vector<int64_t> age_data = {0, 1, 1, 2, 2, 3, 4, 3, 2, 5};
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
tss[i] = i;
insert_record.insert_pk(age_data[i], i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// delete pk(2) at ts(5)
std::vector<Timestamp> delete_ts = {5};
std::vector<PkType> delete_pk = {2};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(2, delete_record.size());
{
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
// query at ts (10)
Timestamp query_timestamp = 10;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 0, 0, 1, 1, 0, 0, 0, 0, 0};
// two pk 2 at ts(3, 4) was deleted
for (int i = 0; i < N; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete pk(3) at ts(6)
delete_ts = {6};
delete_pk = {3};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(3, delete_record.size());
{
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
// query at ts (10)
Timestamp query_timestamp = 10;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 0, 0, 1, 1, 1, 0, 0, 0, 0};
// one pk 3 in ts(5) was deleted
for (int i = 0; i < N; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete pk(3) at ts(9) again
delete_ts = {9};
delete_pk = {3};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(4, delete_record.size());
{
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
// query at ts (10)
Timestamp query_timestamp = 10;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 0, 0, 1, 1, 1, 0, 1, 0, 0};
// pk 3 in ts(7) was deleted
for (int i = 0; i < N; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
// delete pk(2) at ts(9) again
delete_ts = {9};
delete_pk = {2};
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(5, delete_record.size());
{
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
// query at ts (10)
Timestamp query_timestamp = 10;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
std::vector<bool> expected = {0, 0, 0, 1, 1, 1, 0, 1, 1, 0};
// pk 2 in ts(8) was deleted
for (int i = 0; i < N; i++) {
ASSERT_EQ(bitsets_view[i], expected[i]);
}
}
}
TEST(DeleteMVCC, snapshot) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 500000;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord<false> delete_record(&insert_record);
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = i;
tss[i] = i;
insert_record.insert_pk(age_data[i], i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
auto DN = 400000;
std::vector<Timestamp> delete_ts(DN);
std::vector<PkType> delete_pk(DN);
for (int i = 0; i < DN; ++i) {
delete_pk[i] = age_data[i];
delete_ts[i] = i + 1;
}
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(DN, delete_record.size());
auto snapshots = std::move(delete_record.get_snapshots());
ASSERT_EQ(3, snapshots.size());
ASSERT_EQ(snapshots[2].second.count(), 300000);
}
TEST(DeleteMVCC, insert_after_snapshot) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 110000;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord<false> delete_record(&insert_record);
// insert (0, 0), (1, 1) .... (N - 1, N - 1)
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = i;
tss[i] = i;
insert_record.insert_pk(age_data[i], i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// delete (0, 1), (1, 2) .... (DN, DN + 1)
auto DN = 101000;
std::vector<Timestamp> delete_ts(DN);
std::vector<PkType> delete_pk(DN);
for (int i = 0; i < DN; ++i) {
delete_pk[i] = age_data[i];
delete_ts[i] = i + 1;
}
delete_record.StreamPush(delete_pk, delete_ts.data());
ASSERT_EQ(DN, delete_record.size());
auto snapshots = std::move(delete_record.get_snapshots());
ASSERT_EQ(1, snapshots.size());
ASSERT_EQ(snapshots[0].second.count(), 100000);
// Query at N+1 ts
{
BitsetType bitsets(N);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N;
Timestamp query_timestamp = N + 1;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
for (auto i = 0; i < DN; i++) {
ASSERT_EQ(bitsets_view[i], true) << i;
}
for (auto i = DN; i < N; i++) {
ASSERT_EQ(bitsets_view[i], false) << i;
}
}
// insert (N, N), (N + 1, N + 1).... (N + AN - 1, N + AN - 1) again
auto AN = 1000;
std::vector<int64_t> age_data_new(AN);
std::vector<Timestamp> tss_new(AN);
for (int i = 0; i < AN; ++i) {
age_data_new[i] = N + i;
tss_new[i] = N + i;
insert_record.insert_pk(age_data_new[i], i + N);
}
insert_offset = insert_record.reserved.fetch_add(AN);
insert_record.timestamps_.set_data_raw(insert_offset, tss_new.data(), AN);
field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), AN);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + AN);
// Query at N + AN + 1 ts
{
BitsetType bitsets(N + AN);
BitsetTypeView bitsets_view(bitsets);
int64_t insert_barrier = N + AN;
Timestamp query_timestamp = N + AN + 1;
delete_record.Query(bitsets_view, insert_barrier, query_timestamp);
for (auto i = 0; i < DN; i++) {
ASSERT_EQ(bitsets_view[i], true);
}
for (auto i = DN; i < N + AN; i++) {
ASSERT_EQ(bitsets_view[i], false);
}
}
}
TEST(DeleteMVCC, perform) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 1000000;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord<false> delete_record(&insert_record);
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = i;
tss[i] = i;
insert_record.insert_pk(i, i);
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N);
auto field_data = insert_record.get_data_base(i64_fid);
field_data->set_data_raw(insert_offset, age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
auto DN = N / 2;
std::vector<Timestamp> delete_ts(DN);
std::vector<PkType> delete_pk(DN);
for (int i = 0; i < DN; ++i) {
delete_ts[i] = N + i;
delete_pk[i] = i;
}
auto start = std::chrono::steady_clock::now();
delete_record.StreamPush(delete_pk, delete_ts.data());
auto end = std::chrono::steady_clock::now();
std::cout << "push cost:"
<< std::chrono::duration_cast<std::chrono::microseconds>(end -
start)
.count()
<< std::endl;
std::cout << delete_record.size() << std::endl;
auto query_timestamp = delete_ts[DN - 1];
auto insert_barrier = get_barrier(insert_record, query_timestamp);
BitsetType res_bitmap(insert_barrier);
BitsetTypeView res_view(res_bitmap);
start = std::chrono::steady_clock::now();
delete_record.Query(res_view, insert_barrier, query_timestamp);
end = std::chrono::steady_clock::now();
std::cout << "query cost:"
<< std::chrono::duration_cast<std::chrono::microseconds>(end -
start)
.count()
<< std::endl;
}

View File

@ -89,7 +89,7 @@ TEST(Growing, RealCount) {
// delete all.
auto del_offset3 = segment->get_deleted_count();
ASSERT_EQ(del_offset3, half * 2);
ASSERT_EQ(del_offset3, half);
auto del_ids3 = GenPKs(pks.begin(), pks.end());
auto del_tss3 = GenTss(c, c + half * 2);
status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data());

View File

@ -1294,6 +1294,7 @@ TEST(Sealed, DeleteCount) {
auto pk = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk);
auto segment = CreateSealedSegment(schema);
segment->get_insert_record().seal_pks();
int64_t c = 10;
auto offset = segment->get_deleted_count();
@ -1305,9 +1306,8 @@ TEST(Sealed, DeleteCount) {
auto status = segment->Delete(offset, c, pks.get(), tss.data());
ASSERT_TRUE(status.ok());
// shouldn't be filtered for empty segment.
auto cnt = segment->get_deleted_count();
ASSERT_EQ(cnt, 10);
ASSERT_EQ(cnt, 0);
}
{
auto schema = std::make_shared<Schema>();
@ -1374,7 +1374,7 @@ TEST(Sealed, RealCount) {
// delete all.
auto del_offset3 = segment->get_deleted_count();
ASSERT_EQ(del_offset3, half * 2);
ASSERT_EQ(del_offset3, half);
auto del_ids3 = GenPKs(pks.begin(), pks.end());
auto del_tss3 = GenTss(c, c + half * 2);
status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data());

View File

@ -64,7 +64,7 @@ TEST(Util, GetDeleteBitmap) {
auto N = 10;
uint64_t seg_id = 101;
InsertRecord insert_record(*schema, N);
DeletedRecord delete_record;
DeletedRecord<false> delete_record(&insert_record);
// fill insert record, all insert records has same pk = 1, timestamps= {1 ... N}
std::vector<int64_t> age_data(N);
@ -83,37 +83,14 @@ TEST(Util, GetDeleteBitmap) {
// test case delete pk1(ts = 0) -> insert repeated pk1 (ts = {1 ... N}) -> query (ts = N)
std::vector<Timestamp> delete_ts = {0};
std::vector<PkType> delete_pk = {1};
delete_record.push(delete_pk, delete_ts.data());
delete_record.StreamPush(delete_pk, delete_ts.data());
auto query_timestamp = tss[N - 1];
auto del_barrier = get_barrier(delete_record, query_timestamp);
auto insert_barrier = get_barrier(insert_record, query_timestamp);
auto res_bitmap = get_deleted_bitmap(del_barrier,
insert_barrier,
delete_record,
insert_record,
query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N)
delete_ts = {uint64_t(N)};
delete_pk = {1};
delete_record.push(delete_pk, delete_ts.data());
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap = get_deleted_bitmap(del_barrier,
insert_barrier,
delete_record,
insert_record,
query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N - 1);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2)
query_timestamp = tss[N - 1] / 2;
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap = get_deleted_bitmap(
del_barrier, N, delete_record, insert_record, query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
BitsetType res_bitmap(insert_barrier);
BitsetTypeView res_view(res_bitmap);
delete_record.Query(res_view, insert_barrier, query_timestamp);
ASSERT_EQ(res_view.count(), 0);
}
TEST(Util, OutOfRange) {