mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Ignore entry with same ts when DeleteRecord search pks (#43669)
Related to #43660 This patch reduces the unwanted offset&ts entries having same timestamp of delete record. Under large amount of upsert, this false hit could increase large amount of memory usage while applying delete. The next step could be passing a callback to `search_pk_func_` to handle hit entry streamingly. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
e37cd19da2
commit
5f2f4eb3d6
@ -898,12 +898,14 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
|
||||
|
||||
std::vector<std::pair<SegOffset, Timestamp>>
|
||||
ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
|
||||
const Timestamp* timestamps) const {
|
||||
const Timestamp* timestamps,
|
||||
bool include_same_ts) const {
|
||||
std::vector<std::pair<SegOffset, Timestamp>> pk_offsets;
|
||||
// handle unsorted case
|
||||
if (!is_sorted_by_pk_) {
|
||||
for (size_t i = 0; i < pks.size(); i++) {
|
||||
auto offsets = insert_record_.search_pk(pks[i], timestamps[i]);
|
||||
auto offsets = insert_record_.search_pk(
|
||||
pks[i], timestamps[i], include_same_ts);
|
||||
for (auto offset : offsets) {
|
||||
pk_offsets.emplace_back(offset, timestamps[i]);
|
||||
}
|
||||
@ -917,6 +919,13 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
|
||||
|
||||
auto all_chunk_pins = pk_column->GetAllChunks();
|
||||
|
||||
auto timestamp_hit = include_same_ts
|
||||
? [](const Timestamp& ts1,
|
||||
const Timestamp& ts2) { return ts1 <= ts2; }
|
||||
: [](const Timestamp& ts1, const Timestamp& ts2) {
|
||||
return ts1 < ts2;
|
||||
};
|
||||
|
||||
switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
|
||||
case DataType::INT64: {
|
||||
auto num_chunk = pk_column->num_chunks();
|
||||
@ -940,7 +949,8 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
|
||||
pk_column->GetNumRowsUntilChunk(i);
|
||||
for (; it != src + chunk_row_num && *it == target; ++it) {
|
||||
auto offset = it - src + num_rows_until_chunk;
|
||||
if (insert_record_.timestamps_[offset] <= timestamp) {
|
||||
if (timestamp_hit(insert_record_.timestamps_[offset],
|
||||
timestamp)) {
|
||||
pk_offsets.emplace_back(offset, timestamp);
|
||||
}
|
||||
}
|
||||
@ -965,8 +975,9 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
|
||||
string_chunk->operator[](offset) == target;
|
||||
++offset) {
|
||||
auto segment_offset = offset + num_rows_until_chunk;
|
||||
if (insert_record_.timestamps_[segment_offset] <=
|
||||
timestamp) {
|
||||
if (timestamp_hit(
|
||||
insert_record_.timestamps_[segment_offset],
|
||||
timestamp)) {
|
||||
pk_offsets.emplace_back(segment_offset, timestamp);
|
||||
}
|
||||
}
|
||||
@ -1111,7 +1122,7 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
|
||||
deleted_record_(
|
||||
&insert_record_,
|
||||
[this](const std::vector<PkType>& pks, const Timestamp* timestamps) {
|
||||
return this->search_batch_pks(pks, timestamps);
|
||||
return this->search_batch_pks(pks, timestamps, false);
|
||||
},
|
||||
segment_id) {
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
|
||||
@ -206,7 +206,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
|
||||
|
||||
std::vector<std::pair<SegOffset, Timestamp>>
|
||||
search_batch_pks(const std::vector<PkType>& pks,
|
||||
const Timestamp* timestamps) const;
|
||||
const Timestamp* timestamps,
|
||||
bool include_same_ts) const;
|
||||
|
||||
public:
|
||||
int64_t
|
||||
|
||||
@ -321,12 +321,20 @@ struct InsertRecord {
|
||||
}
|
||||
|
||||
std::vector<SegOffset>
|
||||
search_pk(const PkType& pk, Timestamp timestamp) const {
|
||||
search_pk(const PkType& pk,
|
||||
Timestamp timestamp,
|
||||
bool include_same_ts = true) const {
|
||||
std::shared_lock lck(shared_mutex_);
|
||||
std::vector<SegOffset> res_offsets;
|
||||
auto offset_iter = pk2offset_->find(pk);
|
||||
auto timestamp_hit =
|
||||
include_same_ts ? [](const Timestamp& ts1,
|
||||
const Timestamp& ts2) { return ts1 <= ts2; }
|
||||
: [](const Timestamp& ts1, const Timestamp& ts2) {
|
||||
return ts1 < ts2;
|
||||
};
|
||||
for (auto offset : offset_iter) {
|
||||
if (timestamps_[offset] <= timestamp) {
|
||||
if (timestamp_hit(timestamps_[offset], timestamp)) {
|
||||
res_offsets.emplace_back(offset);
|
||||
}
|
||||
}
|
||||
|
||||
@ -667,11 +667,13 @@ SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const {
|
||||
|
||||
std::vector<std::pair<SegOffset, Timestamp>>
|
||||
SegmentGrowingImpl::search_batch_pks(const std::vector<PkType>& pks,
|
||||
const Timestamp* timestamps) const {
|
||||
const Timestamp* timestamps,
|
||||
bool include_same_ts) const {
|
||||
std::vector<std::pair<SegOffset, Timestamp>> results;
|
||||
for (size_t i = 0; i < pks.size(); ++i) {
|
||||
auto timestamp = timestamps[i];
|
||||
auto offsets = insert_record_.search_pk(pks[i], timestamp);
|
||||
auto offsets =
|
||||
insert_record_.search_pk(pks[i], timestamp, include_same_ts);
|
||||
for (auto offset : offsets) {
|
||||
results.emplace_back(offset, timestamp);
|
||||
}
|
||||
|
||||
@ -177,7 +177,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
|
||||
std::vector<std::pair<SegOffset, Timestamp>>
|
||||
search_batch_pks(const std::vector<PkType>& pks,
|
||||
const Timestamp* timestamps) const;
|
||||
const Timestamp* timestamps,
|
||||
bool include_same_ts) const;
|
||||
|
||||
public:
|
||||
size_t
|
||||
@ -297,7 +298,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
&insert_record_,
|
||||
[this](const std::vector<PkType>& pks,
|
||||
const Timestamp* timestamps) {
|
||||
return this->search_batch_pks(pks, timestamps);
|
||||
return this->search_batch_pks(pks, timestamps, false);
|
||||
},
|
||||
segment_id) {
|
||||
this->CreateTextIndexes();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user