mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
fix:expand lock range for dump_snapshot (#44130)
issue: #44129 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
7721edf32a
commit
e2f34d7b78
@ -105,7 +105,15 @@ class DeletedRecord {
|
|||||||
|
|
||||||
bool can_dump = timestamps[0] >= max_load_timestamp_;
|
bool can_dump = timestamps[0] >= max_load_timestamp_;
|
||||||
if (can_dump) {
|
if (can_dump) {
|
||||||
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
DumpSnapshot();
|
DumpSnapshot();
|
||||||
|
auto end_time = std::chrono::steady_clock::now();
|
||||||
|
auto duration =
|
||||||
|
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
end_time - start_time);
|
||||||
|
LOG_INFO("dump delete record snapshot cost: {}ms for segment: {}",
|
||||||
|
duration.count() / 1000,
|
||||||
|
segment_id_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,11 +241,11 @@ class DeletedRecord {
|
|||||||
|
|
||||||
void
|
void
|
||||||
DumpSnapshot() {
|
DumpSnapshot() {
|
||||||
|
std::unique_lock<std::shared_mutex> lock(snap_lock_);
|
||||||
SortedDeleteList::Accessor accessor(deleted_lists_);
|
SortedDeleteList::Accessor accessor(deleted_lists_);
|
||||||
int total_size = accessor.size();
|
int total_size = accessor.size();
|
||||||
int dumped_size = dumped_entry_count_.load();
|
|
||||||
|
|
||||||
while (total_size - dumped_size > DUMP_BATCH_SIZE) {
|
while (total_size - dumped_entry_count_.load() > DUMP_BATCH_SIZE) {
|
||||||
int32_t bitsize = 0;
|
int32_t bitsize = 0;
|
||||||
if constexpr (is_sealed) {
|
if constexpr (is_sealed) {
|
||||||
bitsize = sealed_row_count_;
|
bitsize = sealed_row_count_;
|
||||||
@ -255,7 +263,7 @@ class DeletedRecord {
|
|||||||
snapshots_.back().second.size());
|
snapshots_.back().second.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
while (total_size - dumped_size > DUMP_BATCH_SIZE &&
|
while (total_size - dumped_entry_count_.load() > DUMP_BATCH_SIZE &&
|
||||||
it != accessor.end()) {
|
it != accessor.end()) {
|
||||||
Timestamp dump_ts = 0;
|
Timestamp dump_ts = 0;
|
||||||
|
|
||||||
@ -266,34 +274,29 @@ class DeletedRecord {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
if (dump_ts == last_dump_ts) {
|
||||||
std::unique_lock<std::shared_mutex> lock(snap_lock_);
|
// only update
|
||||||
if (dump_ts == last_dump_ts) {
|
snapshots_.back().second = std::move(bitmap.clone());
|
||||||
// only update
|
snap_next_iter_.back() = it;
|
||||||
snapshots_.back().second = std::move(bitmap.clone());
|
} else {
|
||||||
snap_next_iter_.back() = it;
|
// add new snapshot
|
||||||
} else {
|
snapshots_.push_back(
|
||||||
// add new snapshot
|
std::make_pair(dump_ts, bitmap.clone()));
|
||||||
snapshots_.push_back(
|
Assert(it != accessor.end() && it.good());
|
||||||
std::make_pair(dump_ts, bitmap.clone()));
|
snap_next_iter_.push_back(it);
|
||||||
Assert(it != accessor.end() && it.good());
|
|
||||||
snap_next_iter_.push_back(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
dumped_entry_count_.store(dumped_size + DUMP_BATCH_SIZE);
|
|
||||||
LOG_INFO(
|
|
||||||
"dump delete record snapshot at ts: {}, cursor: {}, "
|
|
||||||
"total size:{} "
|
|
||||||
"current snapshot size: {} for segment: {}",
|
|
||||||
dump_ts,
|
|
||||||
dumped_size + DUMP_BATCH_SIZE,
|
|
||||||
total_size,
|
|
||||||
snapshots_.size(),
|
|
||||||
segment_id_);
|
|
||||||
last_dump_ts = dump_ts;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dumped_size += DUMP_BATCH_SIZE;
|
dumped_entry_count_.fetch_add(DUMP_BATCH_SIZE);
|
||||||
|
LOG_INFO(
|
||||||
|
"dump delete record snapshot at ts: {}, cursor: {}, "
|
||||||
|
"total size:{} "
|
||||||
|
"current snapshot size: {} for segment: {}",
|
||||||
|
dump_ts,
|
||||||
|
dumped_entry_count_.load(),
|
||||||
|
total_size,
|
||||||
|
snapshots_.size(),
|
||||||
|
segment_id_);
|
||||||
|
last_dump_ts = dump_ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user