mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: cachinglayer: reserve resource for inevictable cachecell (#43602)
issue: #41435 --------- Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
parent
c04d678ad4
commit
169be30a76
@ -52,11 +52,13 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
"representing the memory consumption of the cell");
|
||||
|
||||
CacheSlot(std::unique_ptr<Translator<CellT>> translator,
|
||||
internal::DList* dlist)
|
||||
internal::DList* dlist,
|
||||
bool evictable)
|
||||
: translator_(std::move(translator)),
|
||||
cell_id_mapping_mode_(translator_->meta()->cell_id_mapping_mode),
|
||||
cells_(translator_->num_cells()),
|
||||
dlist_(dlist) {
|
||||
dlist_(dlist),
|
||||
evictable_(evictable) {
|
||||
for (cid_t i = 0; i < translator_->num_cells(); ++i) {
|
||||
new (&cells_[i])
|
||||
CacheCell(this, i, translator_->estimated_byte_size_of_cell(i));
|
||||
@ -89,7 +91,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
for (cid_t i = 0; i < translator_->num_cells(); ++i) {
|
||||
cids.push_back(i);
|
||||
}
|
||||
SemiInlineGet(PinCells(std::move(cids)));
|
||||
SemiInlineGet(PinCells(cids));
|
||||
}
|
||||
|
||||
folly::SemiFuture<std::shared_ptr<CellAccessor<CellT>>>
|
||||
@ -306,9 +308,11 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
public:
|
||||
CacheCell() = default;
|
||||
CacheCell(CacheSlot<CellT>* slot, cid_t cid, ResourceUsage size)
|
||||
: internal::ListNode(slot->dlist_, size), slot_(slot), cid_(cid) {
|
||||
: internal::ListNode(slot->dlist_, size, slot->evictable_),
|
||||
slot_(slot),
|
||||
cid_(cid) {
|
||||
}
|
||||
~CacheCell() {
|
||||
~CacheCell() override {
|
||||
if (state_ == State::LOADING) {
|
||||
LOG_ERROR("[MCL] CacheSlot Cell {} destroyed while loading",
|
||||
key());
|
||||
@ -386,6 +390,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
std::vector<CacheCell> cells_;
|
||||
CellIdMappingMode cell_id_mapping_mode_;
|
||||
internal::DList* dlist_;
|
||||
const bool evictable_;
|
||||
};
|
||||
|
||||
// - A thin wrapper for accessing cells in a CacheSlot.
|
||||
|
||||
@ -45,13 +45,11 @@ class Manager {
|
||||
template <typename CellT>
|
||||
std::shared_ptr<CacheSlot<CellT>>
|
||||
CreateCacheSlot(std::unique_ptr<Translator<CellT>> translator) {
|
||||
// if eviction is disabled, pass nullptr dlist to CacheSlot, so pinned cells
|
||||
// in this CacheSlot will not be evicted.
|
||||
auto dlist = (translator->meta()->support_eviction && evictionEnabled_)
|
||||
? dlist_.get()
|
||||
: nullptr;
|
||||
auto cache_slot =
|
||||
std::make_shared<CacheSlot<CellT>>(std::move(translator), dlist);
|
||||
auto evictable =
|
||||
translator->meta()->support_eviction && evictionEnabled_;
|
||||
// NOTE: when evictionEnabled_ is false, dlist_ is nullptr.
|
||||
auto cache_slot = std::make_shared<CacheSlot<CellT>>(
|
||||
std::move(translator), dlist_.get(), evictable);
|
||||
cache_slot->Warmup();
|
||||
return cache_slot;
|
||||
}
|
||||
|
||||
@ -556,6 +556,11 @@ DList::removeLoadingResource(const ResourceUsage& size) {
|
||||
loading_ -= size * eviction_config_.loading_memory_factor;
|
||||
}
|
||||
|
||||
void
|
||||
DList::removeLoadedResource(const ResourceUsage& size) {
|
||||
used_resources_ -= size;
|
||||
}
|
||||
|
||||
void
|
||||
DList::notifyWaitingRequests() {
|
||||
while (!waiting_queue_.empty()) {
|
||||
|
||||
@ -130,6 +130,9 @@ class DList {
|
||||
void
|
||||
removeLoadingResource(const ResourceUsage& size);
|
||||
|
||||
void
|
||||
removeLoadedResource(const ResourceUsage& size);
|
||||
|
||||
const EvictionConfig&
|
||||
eviction_config() const {
|
||||
return eviction_config_;
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
#include <fmt/core.h>
|
||||
#include <folly/ExceptionWrapper.h>
|
||||
@ -47,19 +48,23 @@ ListNode::NodePin::operator=(NodePin&& other) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
ListNode::ListNode(DList* dlist, ResourceUsage size)
|
||||
ListNode::ListNode(DList* dlist, ResourceUsage size, bool evictable)
|
||||
: last_touch_(dlist ? (std::chrono::high_resolution_clock::now() -
|
||||
2 * dlist->eviction_config().cache_touch_window)
|
||||
: std::chrono::high_resolution_clock::now()),
|
||||
dlist_(dlist),
|
||||
size_(size),
|
||||
state_(State::NOT_LOADED) {
|
||||
evictable_(evictable) {
|
||||
}
|
||||
|
||||
ListNode::~ListNode() {
|
||||
if (dlist_) {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
dlist_->removeItem(this, size_);
|
||||
if (evictable_) {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
dlist_->removeItem(this, size_);
|
||||
} else {
|
||||
dlist_->removeLoadedResource(size_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,7 +108,8 @@ ListNode::pin() {
|
||||
"Programming error: read_op called on a {} cell",
|
||||
state_to_string(state_));
|
||||
// pin the cell now so that we can avoid taking the lock again in deferValue.
|
||||
if (pin_count_.fetch_add(1) == 0 && state_ == State::LOADED && dlist_) {
|
||||
if (pin_count_.fetch_add(1) == 0 && state_ == State::LOADED && dlist_ &&
|
||||
evictable_) {
|
||||
// node became inevictable, decrease evictable size
|
||||
dlist_->decreaseEvictableSize(size_);
|
||||
}
|
||||
@ -166,7 +172,7 @@ ListNode::set_error(folly::exception_wrapper error) {
|
||||
// setException may call continuation of bound futures inline, and those continuation may also need to acquire the
|
||||
// lock, which may cause deadlock. So we release the lock before calling setException.
|
||||
if (promise) {
|
||||
promise->setException(error);
|
||||
promise->setException(std::move(error));
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,9 +193,11 @@ void
|
||||
ListNode::unpin() {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
if (pin_count_.fetch_sub(1) == 1) {
|
||||
touch(false);
|
||||
if (evictable_) {
|
||||
touch(false);
|
||||
}
|
||||
// Notify DList that this node became evictable
|
||||
if (dlist_ && state_ == State::LOADED) {
|
||||
if (dlist_ && evictable_ && state_ == State::LOADED) {
|
||||
dlist_->increaseEvictableSize(size_);
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,12 +41,12 @@ class ListNode {
|
||||
~NodePin();
|
||||
|
||||
private:
|
||||
NodePin(ListNode* node);
|
||||
explicit NodePin(ListNode* node);
|
||||
friend class ListNode;
|
||||
ListNode* node_;
|
||||
};
|
||||
ListNode() = default;
|
||||
ListNode(DList* dlist, ResourceUsage size);
|
||||
ListNode(DList* dlist, ResourceUsage size, bool evictable);
|
||||
virtual ~ListNode();
|
||||
|
||||
// ListNode is not movable/copyable because it contains a shared_mutex.
|
||||
@ -123,7 +123,9 @@ class ListNode {
|
||||
state_ = State::LOADED;
|
||||
cb();
|
||||
// memory of this cell is not reserved, touch() to track it.
|
||||
touch(true);
|
||||
if (evictable_) {
|
||||
touch(true);
|
||||
}
|
||||
} else if (state_ == State::LOADING) {
|
||||
// another thread has explicitly requested loading this cell, we did it first
|
||||
// thus we set up the state first.
|
||||
@ -131,7 +133,9 @@ class ListNode {
|
||||
state_ = State::LOADED;
|
||||
promise = std::move(load_promise_);
|
||||
// the node that marked LOADING has already reserved memory, do not double count.
|
||||
touch(false);
|
||||
if (evictable_) {
|
||||
touch(false);
|
||||
}
|
||||
remove_self_from_loading_resource();
|
||||
} else {
|
||||
// LOADED: cell has been loaded by another thread, do nothing.
|
||||
@ -190,6 +194,7 @@ class ListNode {
|
||||
std::atomic<int> pin_count_{0};
|
||||
|
||||
std::unique_ptr<folly::SharedPromise<folly::Unit>> load_promise_{nullptr};
|
||||
bool evictable_{false};
|
||||
};
|
||||
|
||||
} // namespace milvus::cachinglayer::internal
|
||||
|
||||
@ -25,7 +25,7 @@ class MockListNode : public ListNode {
|
||||
ResourceUsage size,
|
||||
const std::string& key = "mock_key",
|
||||
cid_t cid = 0)
|
||||
: ListNode(dlist, size), mock_key_(fmt::format("{}:{}", key, cid)) {
|
||||
: ListNode(dlist, size, true), mock_key_(fmt::format("{}:{}", key, cid)) {
|
||||
ON_CALL(*this, clear_data).WillByDefault([this]() {
|
||||
unload();
|
||||
state_ = State::NOT_LOADED;
|
||||
|
||||
@ -231,7 +231,7 @@ class CacheSlotTest : public ::testing::Test {
|
||||
cell_sizes_, uid_to_cid_map_, SLOT_KEY, StorageType::MEMORY);
|
||||
translator_ = temp_translator_uptr.get();
|
||||
cache_slot_ = std::make_shared<CacheSlot<TestCell>>(
|
||||
std::move(temp_translator_uptr), dlist_.get());
|
||||
std::move(temp_translator_uptr), dlist_.get(), true);
|
||||
}
|
||||
|
||||
void
|
||||
@ -602,7 +602,7 @@ TEST_P(CacheSlotConcurrentTest, ConcurrentAccessMultipleSlots) {
|
||||
/*for_concurrent_test*/ true);
|
||||
MockTranslator* translator_1 = translator_1_ptr.get();
|
||||
auto slot1 = std::make_shared<CacheSlot<TestCell>>(
|
||||
std::move(translator_1_ptr), dlist_.get());
|
||||
std::move(translator_1_ptr), dlist_.get(), true);
|
||||
|
||||
std::vector<std::pair<cid_t, int64_t>> cell_sizes_2 = {
|
||||
{0, 55}, {1, 65}, {2, 75}, {3, 85}, {4, 95}};
|
||||
@ -616,7 +616,7 @@ TEST_P(CacheSlotConcurrentTest, ConcurrentAccessMultipleSlots) {
|
||||
/*for_concurrent_test*/ true);
|
||||
MockTranslator* translator_2 = translator_2_ptr.get();
|
||||
auto slot2 = std::make_shared<CacheSlot<TestCell>>(
|
||||
std::move(translator_2_ptr), dlist_.get());
|
||||
std::move(translator_2_ptr), dlist_.get(), true);
|
||||
|
||||
bool with_bonus_cells = GetParam();
|
||||
if (with_bonus_cells) {
|
||||
@ -752,7 +752,7 @@ TEST_P(CacheSlotConcurrentTest, ConcurrentAccessMultipleSlots) {
|
||||
StorageType::MEMORY,
|
||||
/*for_concurrent_test*/ true);
|
||||
auto sl = std::make_shared<CacheSlot<TestCell>>(
|
||||
std::move(translator_3_ptr), dlist_ptr);
|
||||
std::move(translator_3_ptr), dlist_ptr, dlist_ptr != nullptr);
|
||||
return sl;
|
||||
};
|
||||
std::shared_ptr<CacheSlot<TestCell>> slot3 = create_new_slot3();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user