mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
feat: impose a physical memory limit when loading cells (#43222)
issue: #41435 issue: https://github.com/milvus-io/milvus/issues/43038 This PR also: 1. removed ERROR state from ListNode 2. CacheSlot will do reserveMemory once for all requested cells after updating the state to LOADING, so now we transit a cell to LOADING before its resource reservation 3. reject resource reservation directly if size >= max_size --------- Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
parent
07fa2cbdd3
commit
d793def47c
@ -465,15 +465,13 @@ queryNode:
|
||||
vectorIndex: sync
|
||||
# If evictionEnabled is true, a background thread will run every evictionIntervalMs to determine if an
|
||||
# eviction is necessary and the amount of data to evict from memory/disk.
|
||||
# - The max ratio is the max amount of memory/disk that can be used for cache.
|
||||
# - If the current memory/disk usage exceeds the high watermark, an eviction will be triggered to evict data from memory/disk
|
||||
# until the memory/disk usage is below the low watermark.
|
||||
# - The max amount of memory/disk that can be used for cache is controlled by overloadedMemoryThresholdPercentage and diskMaxUsagePercentage.
|
||||
memoryLowWatermarkRatio: 0.6
|
||||
memoryHighWatermarkRatio: 0.8
|
||||
memoryMaxRatio: 0.9
|
||||
diskLowWatermarkRatio: 0.6
|
||||
diskHighWatermarkRatio: 0.8
|
||||
diskMaxRatio: 0.9
|
||||
# Enable eviction for Tiered Storage. Defaults to false.
|
||||
# Note that if eviction is enabled, cache data loaded during sync warmup is also subject to eviction.
|
||||
evictionEnabled: false
|
||||
|
||||
@ -137,14 +137,15 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
});
|
||||
}
|
||||
|
||||
// Manually evicts the cell if it is not pinned.
|
||||
// Returns true if the cell ends up in a state other than LOADED.
|
||||
// Manually evicts the cell if it is LOADED and not pinned.
|
||||
// Returns true if the eviction happened.
|
||||
bool
|
||||
ManualEvict(cid_t cid) {
|
||||
return cells_[cid].manual_evict();
|
||||
}
|
||||
|
||||
// Returns true if any cell is evicted.
|
||||
// Manually evicts all cells that are LOADED and not pinned.
|
||||
// Returns true if eviction happened on any cell.
|
||||
bool
|
||||
ManualEvictAll() {
|
||||
bool evicted = false;
|
||||
@ -163,7 +164,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
|
||||
ResourceUsage
|
||||
size_of_cell(cid_t cid) const {
|
||||
return translator_->estimated_byte_size_of_cell(cid);
|
||||
return cells_[cid].size();
|
||||
}
|
||||
|
||||
Meta*
|
||||
@ -190,6 +191,7 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
std::unordered_set<cid_t> need_load_cids;
|
||||
futures.reserve(cids.size());
|
||||
need_load_cids.reserve(cids.size());
|
||||
auto resource_needed = ResourceUsage{0, 0};
|
||||
for (const auto& cid : cids) {
|
||||
if (cid >= cells_.size()) {
|
||||
throw std::invalid_argument(fmt::format("cid {} out of range, slot has {} cells", cid, cells_.size()));
|
||||
@ -200,10 +202,11 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
futures.push_back(std::move(future));
|
||||
if (need_load) {
|
||||
need_load_cids.insert(cid);
|
||||
resource_needed += cells_[cid].size();
|
||||
}
|
||||
}
|
||||
if (!need_load_cids.empty()) {
|
||||
RunLoad(std::move(need_load_cids));
|
||||
RunLoad(std::move(need_load_cids), resource_needed);
|
||||
}
|
||||
|
||||
auto pins = SemiInlineGet(folly::collect(futures));
|
||||
@ -224,10 +227,24 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
}
|
||||
|
||||
void
|
||||
RunLoad(std::unordered_set<cid_t>&& cids) {
|
||||
RunLoad(std::unordered_set<cid_t>&& cids, ResourceUsage resource_needed) {
|
||||
bool reserve_resource_failure = false;
|
||||
try {
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
std::vector<cid_t> cids_vec(cids.begin(), cids.end());
|
||||
|
||||
if (dlist_ && !dlist_->reserveMemory(resource_needed)) {
|
||||
auto error_msg = fmt::format(
|
||||
"[MCL] CacheSlot failed to reserve memory for cells: "
|
||||
"key={}, cell_ids=[{}], total resource_needed={}",
|
||||
translator_->key(),
|
||||
fmt::join(cids_vec, ","),
|
||||
resource_needed.ToString());
|
||||
LOG_ERROR(error_msg);
|
||||
reserve_resource_failure = true;
|
||||
throw std::runtime_error(error_msg);
|
||||
}
|
||||
|
||||
auto results = translator_->get_cells(cids_vec);
|
||||
auto latency =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
@ -252,6 +269,10 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
for (auto cid : cids) {
|
||||
cells_[cid].set_error(ew);
|
||||
}
|
||||
// If the resource reservation failed, we don't need to release the memory.
|
||||
if (dlist_ && !reserve_resource_failure) {
|
||||
dlist_->releaseMemory(resource_needed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -263,7 +284,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
||||
}
|
||||
~CacheCell() {
|
||||
if (state_ == State::LOADING) {
|
||||
LOG_ERROR("CacheSlot Cell {} destroyed while loading", key());
|
||||
LOG_ERROR("[MCL] CacheSlot Cell {} destroyed while loading",
|
||||
key());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -37,7 +37,8 @@ Manager::ConfigureTieredStorage(CacheWarmupPolicies warmup_policies,
|
||||
|
||||
if (!evictionEnabled) {
|
||||
LOG_INFO(
|
||||
"Tiered Storage manager is configured with disabled eviction");
|
||||
"[MCL] Tiered Storage manager is configured "
|
||||
"with disabled eviction");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -48,40 +49,27 @@ Manager::ConfigureTieredStorage(CacheWarmupPolicies warmup_policies,
|
||||
ResourceUsage high_watermark{cache_limit.memory_high_watermark_bytes,
|
||||
cache_limit.disk_high_watermark_bytes};
|
||||
|
||||
AssertInfo(
|
||||
low_watermark.GEZero(),
|
||||
"Milvus Caching Layer: low watermark must be greater than 0");
|
||||
AssertInfo((high_watermark - low_watermark).GEZero(),
|
||||
"Milvus Caching Layer: high watermark must be greater than "
|
||||
"low watermark");
|
||||
AssertInfo(
|
||||
(max - high_watermark).GEZero(),
|
||||
"Milvus Caching Layer: max must be greater than high watermark");
|
||||
|
||||
manager.dlist_ = std::make_unique<internal::DList>(
|
||||
max, low_watermark, high_watermark, eviction_config);
|
||||
|
||||
LOG_INFO(
|
||||
"Configured Tiered Storage manager with memory watermark: low {} "
|
||||
"bytes ({:.2} GB), high {} bytes ({:.2} GB), max {} bytes "
|
||||
"({:.2} GB), disk watermark: low "
|
||||
"{} bytes ({:.2} GB), high {} bytes ({:.2} GB), max {} bytes "
|
||||
"({:.2} GB), cache touch "
|
||||
"window: {} ms, eviction interval: {} ms",
|
||||
low_watermark.memory_bytes,
|
||||
low_watermark.memory_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
high_watermark.memory_bytes,
|
||||
high_watermark.memory_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
max.memory_bytes,
|
||||
max.memory_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
low_watermark.file_bytes,
|
||||
low_watermark.file_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
high_watermark.file_bytes,
|
||||
high_watermark.file_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
max.file_bytes,
|
||||
max.file_bytes / (1024.0 * 1024.0 * 1024.0),
|
||||
"[MCL] Configured Tiered Storage manager with "
|
||||
"memory watermark: low {}, high {}, max {}, "
|
||||
"disk watermark: low {}, high {}, max {}, "
|
||||
"cache touch window: {} ms, eviction interval: {} ms, "
|
||||
"physical memory max ratio: {}, max disk usage percentage: {}, "
|
||||
"loading memory factor: {}",
|
||||
FormatBytes(low_watermark.memory_bytes),
|
||||
FormatBytes(high_watermark.memory_bytes),
|
||||
FormatBytes(max.memory_bytes),
|
||||
FormatBytes(low_watermark.file_bytes),
|
||||
FormatBytes(high_watermark.file_bytes),
|
||||
FormatBytes(max.file_bytes),
|
||||
eviction_config.cache_touch_window.count(),
|
||||
eviction_config.eviction_interval.count());
|
||||
eviction_config.eviction_interval.count(),
|
||||
eviction_config.overloaded_memory_threshold_percentage,
|
||||
eviction_config.max_disk_usage_percentage,
|
||||
eviction_config.loading_memory_factor);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
239
internal/core/src/cachinglayer/Utils.cpp
Normal file
239
internal/core/src/cachinglayer/Utils.cpp
Normal file
@ -0,0 +1,239 @@
|
||||
// Copyright (C) 2019-2025 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include "cachinglayer/Utils.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "log/Log.h"
|
||||
|
||||
namespace milvus::cachinglayer::internal {
|
||||
|
||||
int64_t
|
||||
getHostTotalMemory() {
|
||||
#ifdef __linux__
|
||||
std::ifstream meminfo("/proc/meminfo");
|
||||
if (!meminfo.is_open()) {
|
||||
LOG_WARN("[MCL] Failed to open /proc/meminfo");
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::string line;
|
||||
while (std::getline(meminfo, line)) {
|
||||
if (line.find("MemTotal:") == 0) {
|
||||
std::istringstream iss(line);
|
||||
std::string key;
|
||||
int64_t value;
|
||||
std::string unit;
|
||||
|
||||
if (iss >> key >> value >> unit) {
|
||||
// Convert kB to bytes
|
||||
if (unit == "kB") {
|
||||
value *= 1024;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
LOG_WARN("[MCL] Host memory detection not implemented for this platform");
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Impl based on pkg/util/hardware/container_linux.go::getContainerMemLimit()
|
||||
int64_t
|
||||
getContainerMemLimit() {
|
||||
#ifdef __linux__
|
||||
std::vector<int64_t> limits;
|
||||
|
||||
// Check MEM_LIMIT environment variable (Docker/container override)
|
||||
const char* mem_limit_env = std::getenv("MEM_LIMIT");
|
||||
if (mem_limit_env) {
|
||||
try {
|
||||
int64_t env_limit = std::stoll(mem_limit_env);
|
||||
limits.push_back(env_limit);
|
||||
LOG_DEBUG("[MCL] Found MEM_LIMIT environment variable: {} bytes",
|
||||
env_limit);
|
||||
} catch (...) {
|
||||
LOG_WARN("[MCL] Invalid MEM_LIMIT environment variable: {}",
|
||||
mem_limit_env);
|
||||
}
|
||||
}
|
||||
|
||||
// Check process-specific cgroup limits from /proc/self/cgroup
|
||||
std::ifstream proc_cgroup("/proc/self/cgroup");
|
||||
if (proc_cgroup.is_open()) {
|
||||
std::string line;
|
||||
while (std::getline(proc_cgroup, line)) {
|
||||
// Look for memory controller lines
|
||||
if (line.find(":memory:") != std::string::npos ||
|
||||
line.find(":0:") != std::string::npos) {
|
||||
size_t last_colon = line.find_last_of(':');
|
||||
if (last_colon != std::string::npos) {
|
||||
std::string cgroup_path = line.substr(last_colon + 1);
|
||||
|
||||
// Try v2 path
|
||||
std::string v2_path =
|
||||
"/sys/fs/cgroup" + cgroup_path + "/memory.max";
|
||||
std::ifstream proc_v2(v2_path);
|
||||
if (proc_v2.is_open()) {
|
||||
std::string proc_line;
|
||||
if (std::getline(proc_v2, proc_line) &&
|
||||
proc_line != "max") {
|
||||
try {
|
||||
int64_t proc_limit = std::stoll(proc_line);
|
||||
limits.push_back(proc_limit);
|
||||
LOG_DEBUG(
|
||||
"[MCL] Found process-specific cgroups v2 "
|
||||
"limit: {} bytes",
|
||||
proc_limit);
|
||||
} catch (...) {
|
||||
// Ignore parse errors
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try v1 path
|
||||
std::string v1_path = "/sys/fs/cgroup/memory" +
|
||||
cgroup_path +
|
||||
"/memory.limit_in_bytes";
|
||||
std::ifstream proc_v1(v1_path);
|
||||
if (proc_v1.is_open()) {
|
||||
std::string proc_line;
|
||||
if (std::getline(proc_v1, proc_line)) {
|
||||
try {
|
||||
int64_t proc_limit = std::stoll(proc_line);
|
||||
// Filters out unrealistic cgroups v1 values (sometimes returns very large numbers
|
||||
// when unlimited)
|
||||
if (proc_limit < (1LL << 62)) {
|
||||
limits.push_back(proc_limit);
|
||||
LOG_DEBUG(
|
||||
"[MCL] Found process-specific cgroups "
|
||||
"v1 limit: {} bytes",
|
||||
proc_limit);
|
||||
}
|
||||
} catch (...) {
|
||||
// Ignore parse errors
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break; // Found memory controller, no need to continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the minimum of all found limits
|
||||
if (!limits.empty()) {
|
||||
int64_t min_limit = *std::min_element(limits.begin(), limits.end());
|
||||
LOG_DEBUG("[MCL] Using minimum memory limit: {} bytes from {} sources",
|
||||
min_limit,
|
||||
limits.size());
|
||||
return min_limit;
|
||||
}
|
||||
|
||||
#else
|
||||
LOG_WARN(
|
||||
"[MCL] Container/cgroup memory limit detection not implemented for "
|
||||
"this platform");
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
SystemMemoryInfo
|
||||
getSystemMemoryInfo() {
|
||||
SystemMemoryInfo info;
|
||||
|
||||
// Get total memory (host vs container)
|
||||
int64_t host_memory = getHostTotalMemory();
|
||||
int64_t container_limit = getContainerMemLimit();
|
||||
|
||||
if (container_limit > 0 && container_limit < host_memory) {
|
||||
info.total_memory_bytes = container_limit;
|
||||
LOG_DEBUG("[MCL] Using container memory limit: {} bytes",
|
||||
container_limit);
|
||||
} else {
|
||||
info.total_memory_bytes = host_memory;
|
||||
if (container_limit > host_memory) {
|
||||
LOG_WARN(
|
||||
"[MCL] Container limit ({} bytes) exceeds host memory ({} "
|
||||
"bytes), using host memory",
|
||||
container_limit,
|
||||
host_memory);
|
||||
}
|
||||
}
|
||||
|
||||
// Get current process memory usage (RSS - Shared)
|
||||
info.used_memory_bytes = getCurrentProcessMemoryUsage();
|
||||
info.available_memory_bytes =
|
||||
info.total_memory_bytes - info.used_memory_bytes;
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
int64_t
|
||||
getCurrentProcessMemoryUsage() {
|
||||
#ifdef __linux__
|
||||
std::ifstream status("/proc/self/status");
|
||||
if (!status.is_open()) {
|
||||
LOG_WARN("[MCL] Failed to open /proc/self/status, returning 0");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t rss = 0;
|
||||
int64_t shared = 0;
|
||||
std::string line;
|
||||
|
||||
while (std::getline(status, line)) {
|
||||
if (line.find("VmRSS:") == 0) {
|
||||
std::istringstream iss(line);
|
||||
std::string key;
|
||||
int64_t value;
|
||||
std::string unit;
|
||||
|
||||
if (iss >> key >> value >> unit) {
|
||||
// Convert kB to bytes
|
||||
if (unit == "kB") {
|
||||
value *= 1024;
|
||||
}
|
||||
rss = value;
|
||||
}
|
||||
} else if (line.find("RssFile:") == 0) {
|
||||
std::istringstream iss(line);
|
||||
std::string key;
|
||||
int64_t value;
|
||||
std::string unit;
|
||||
|
||||
if (iss >> key >> value >> unit) {
|
||||
// Convert kB to bytes
|
||||
if (unit == "kB") {
|
||||
value *= 1024;
|
||||
}
|
||||
shared = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return RSS - Shared (file-backed memory) to match Go implementation
|
||||
return rss - shared;
|
||||
#else
|
||||
LOG_WARN(
|
||||
"[MCL] Process memory monitoring not implemented for this platform");
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace milvus::cachinglayer::internal
|
||||
@ -42,6 +42,21 @@ SemiInlineGet(folly::SemiFuture<T>&& future) {
|
||||
return std::move(future).via(&folly::InlineExecutor::instance()).get();
|
||||
}
|
||||
|
||||
inline std::string
|
||||
FormatBytes(int64_t bytes) {
|
||||
if (bytes < 1024) {
|
||||
return fmt::format("{} B", bytes);
|
||||
} else if (bytes < 1024 * 1024) {
|
||||
return fmt::format("{:.2f} KB ({} B)", bytes / 1024.0, bytes);
|
||||
} else if (bytes < 1024 * 1024 * 1024) {
|
||||
return fmt::format(
|
||||
"{:.2f} MB ({} B)", bytes / (1024.0 * 1024.0), bytes);
|
||||
} else {
|
||||
return fmt::format(
|
||||
"{:.2f} GB ({} B)", bytes / (1024.0 * 1024.0 * 1024.0), bytes);
|
||||
}
|
||||
}
|
||||
|
||||
struct ResourceUsage {
|
||||
int64_t memory_bytes{0};
|
||||
int64_t file_bytes{0};
|
||||
@ -70,6 +85,18 @@ struct ResourceUsage {
|
||||
file_bytes - rhs.file_bytes);
|
||||
}
|
||||
|
||||
ResourceUsage
|
||||
operator*(double factor) const {
|
||||
return ResourceUsage(
|
||||
static_cast<int64_t>(std::round(memory_bytes * factor)),
|
||||
static_cast<int64_t>(std::round(file_bytes * factor)));
|
||||
}
|
||||
|
||||
friend ResourceUsage
|
||||
operator*(double factor, const ResourceUsage& usage) {
|
||||
return usage * factor;
|
||||
}
|
||||
|
||||
void
|
||||
operator-=(const ResourceUsage& rhs) {
|
||||
memory_bytes -= rhs.memory_bytes;
|
||||
@ -87,7 +114,12 @@ struct ResourceUsage {
|
||||
}
|
||||
|
||||
bool
|
||||
GEZero() const {
|
||||
AnyGTZero() const {
|
||||
return memory_bytes > 0 || file_bytes > 0;
|
||||
}
|
||||
|
||||
bool
|
||||
AllGEZero() const {
|
||||
return memory_bytes >= 0 && file_bytes >= 0;
|
||||
}
|
||||
|
||||
@ -106,18 +138,27 @@ struct ResourceUsage {
|
||||
|
||||
std::string
|
||||
ToString() const {
|
||||
return fmt::format(
|
||||
"memory {} bytes ({:.2} GB), disk {} bytes ({:.2} GB)",
|
||||
memory_bytes,
|
||||
memory_bytes / 1024.0 / 1024.0 / 1024.0,
|
||||
file_bytes,
|
||||
file_bytes / 1024.0 / 1024.0 / 1024.0);
|
||||
if (memory_bytes == 0 && file_bytes == 0) {
|
||||
return "EMPTY";
|
||||
}
|
||||
|
||||
std::string result;
|
||||
if (memory_bytes > 0) {
|
||||
result += fmt::format("memory {}", FormatBytes(memory_bytes));
|
||||
}
|
||||
if (file_bytes > 0) {
|
||||
if (!result.empty()) {
|
||||
result += ", ";
|
||||
}
|
||||
result += fmt::format("disk {}", FormatBytes(file_bytes));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
inline std::ostream&
|
||||
operator<<(std::ostream& os, const ResourceUsage& usage) {
|
||||
os << "memory=" << usage.memory_bytes << ", disk=" << usage.file_bytes;
|
||||
os << usage.ToString();
|
||||
return os;
|
||||
}
|
||||
|
||||
@ -206,14 +247,39 @@ struct EvictionConfig {
|
||||
// Use cache_touch_window_ms to reduce the frequency of touching and reduce contention.
|
||||
std::chrono::milliseconds cache_touch_window;
|
||||
std::chrono::milliseconds eviction_interval;
|
||||
// Overloaded memory threshold percentage - limits cache memory usage to this percentage of total physical memory
|
||||
float overloaded_memory_threshold_percentage;
|
||||
// Max disk usage percentage - limits disk cache usage to this percentage of total disk space (not used yet)
|
||||
float max_disk_usage_percentage;
|
||||
// Loading memory factor for estimating memory during loading
|
||||
float loading_memory_factor;
|
||||
|
||||
EvictionConfig()
|
||||
: cache_touch_window(std::chrono::milliseconds(0)),
|
||||
eviction_interval(std::chrono::milliseconds(0)) {
|
||||
eviction_interval(std::chrono::milliseconds(0)),
|
||||
overloaded_memory_threshold_percentage(0.9),
|
||||
max_disk_usage_percentage(0.95),
|
||||
loading_memory_factor(2.5f) {
|
||||
}
|
||||
|
||||
EvictionConfig(int64_t cache_touch_window_ms, int64_t eviction_interval_ms)
|
||||
: cache_touch_window(std::chrono::milliseconds(cache_touch_window_ms)),
|
||||
eviction_interval(std::chrono::milliseconds(eviction_interval_ms)) {
|
||||
eviction_interval(std::chrono::milliseconds(eviction_interval_ms)),
|
||||
overloaded_memory_threshold_percentage(0.9),
|
||||
max_disk_usage_percentage(0.95),
|
||||
loading_memory_factor(2.5f) {
|
||||
}
|
||||
|
||||
EvictionConfig(int64_t cache_touch_window_ms,
|
||||
int64_t eviction_interval_ms,
|
||||
float overloaded_memory_threshold_percentage,
|
||||
float max_disk_usage_percentage,
|
||||
float loading_memory_factor = 2.5f)
|
||||
: cache_touch_window(std::chrono::milliseconds(cache_touch_window_ms)),
|
||||
eviction_interval(std::chrono::milliseconds(eviction_interval_ms)),
|
||||
overloaded_memory_threshold_percentage(overloaded_memory_threshold_percentage),
|
||||
max_disk_usage_percentage(max_disk_usage_percentage),
|
||||
loading_memory_factor(loading_memory_factor) {
|
||||
}
|
||||
};
|
||||
|
||||
@ -378,6 +444,22 @@ cache_memory_overhead_bytes(StorageType storage_type) {
|
||||
}
|
||||
}
|
||||
|
||||
struct SystemMemoryInfo {
|
||||
int64_t total_memory_bytes{0};
|
||||
int64_t available_memory_bytes{0};
|
||||
int64_t used_memory_bytes{0};
|
||||
};
|
||||
|
||||
int64_t
|
||||
getHostTotalMemory();
|
||||
int64_t
|
||||
getContainerMemLimit();
|
||||
|
||||
SystemMemoryInfo
|
||||
getSystemMemoryInfo();
|
||||
int64_t
|
||||
getCurrentProcessMemoryUsage();
|
||||
|
||||
} // namespace internal
|
||||
|
||||
} // namespace milvus::cachinglayer
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
#include "cachinglayer/lrucache/DList.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
@ -26,19 +27,80 @@ namespace milvus::cachinglayer::internal {
|
||||
bool
|
||||
DList::reserveMemory(const ResourceUsage& size) {
|
||||
std::unique_lock<std::mutex> list_lock(list_mtx_);
|
||||
auto used = used_memory_.load();
|
||||
if (max_memory_.CanHold(used + size)) {
|
||||
used_memory_ += size;
|
||||
return true;
|
||||
|
||||
if (!max_memory_.CanHold(size)) {
|
||||
LOG_ERROR(
|
||||
"[MCL] Failed to reserve size={} as it exceeds max_memory_={}.",
|
||||
size.ToString(),
|
||||
max_memory_.ToString());
|
||||
return false;
|
||||
}
|
||||
|
||||
// try to evict so that used + size <= low watermark, but if that is not possible,
|
||||
// evict enough for the current reservation.
|
||||
if (tryEvict(used + size - low_watermark_, used + size - max_memory_)) {
|
||||
used_memory_ += size;
|
||||
return true;
|
||||
auto used = used_memory_.load();
|
||||
|
||||
// Combined logical and physical memory limit check
|
||||
bool logical_limit_exceeded = !max_memory_.CanHold(used + size);
|
||||
int64_t physical_eviction_needed =
|
||||
size.memory_bytes > 0 ? checkPhysicalMemoryLimit(size) : 0;
|
||||
|
||||
// If either limit is exceeded, attempt unified eviction
|
||||
// we attempt eviction based on logical limit once, but multiple times on physical limit
|
||||
// because physical eviction may not be accurate.
|
||||
while (logical_limit_exceeded || physical_eviction_needed > 0) {
|
||||
ResourceUsage eviction_target;
|
||||
ResourceUsage min_eviction;
|
||||
|
||||
if (logical_limit_exceeded) {
|
||||
// Calculate logical eviction requirements
|
||||
eviction_target = used + size - low_watermark_;
|
||||
min_eviction = used + size - max_memory_;
|
||||
}
|
||||
|
||||
if (physical_eviction_needed > 0) {
|
||||
// Combine with logical eviction target (take the maximum)
|
||||
eviction_target.memory_bytes = std::max(
|
||||
eviction_target.memory_bytes, physical_eviction_needed);
|
||||
min_eviction.memory_bytes =
|
||||
std::max(min_eviction.memory_bytes, physical_eviction_needed);
|
||||
}
|
||||
|
||||
// Attempt unified eviction
|
||||
ResourceUsage evicted_size = tryEvict(eviction_target, min_eviction);
|
||||
if (!evicted_size.AnyGTZero()) {
|
||||
LOG_WARN(
|
||||
"[MCL] Failed to reserve size={} due to "
|
||||
"eviction failure, target={}, min_eviction={}",
|
||||
size.ToString(),
|
||||
eviction_target.ToString(),
|
||||
min_eviction.ToString());
|
||||
return false;
|
||||
}
|
||||
// logical limit is accurate, thus we can guarantee after one successful eviction, logical limit is satisfied.
|
||||
logical_limit_exceeded = false;
|
||||
|
||||
if (physical_eviction_needed == 0) {
|
||||
// we only need to evict for logical limit and we have succeeded.
|
||||
break;
|
||||
}
|
||||
|
||||
if (physical_eviction_needed = checkPhysicalMemoryLimit(size);
|
||||
physical_eviction_needed == 0) {
|
||||
// if after eviction we no longer need to evict, we can break.
|
||||
break;
|
||||
}
|
||||
// else perform another round of eviction.
|
||||
LOG_TRACE(
|
||||
"[MCL] reserving size={} failed, evicted_size={}, "
|
||||
"still need to evict {}",
|
||||
size.ToString(),
|
||||
evicted_size.ToString(),
|
||||
FormatBytes(physical_eviction_needed));
|
||||
}
|
||||
return false;
|
||||
|
||||
// Reserve resources (both checks passed)
|
||||
used_memory_ += size;
|
||||
loading_memory_ += size * eviction_config_.loading_memory_factor;
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
@ -101,7 +163,25 @@ DList::usageInfo(const ResourceUsage& actively_pinned) const {
|
||||
precision);
|
||||
}
|
||||
|
||||
bool
|
||||
// not thread safe, use for debug only
|
||||
std::string
|
||||
DList::chainString() const {
|
||||
std::stringstream ss;
|
||||
ss << "[MCL] DList chain: ";
|
||||
size_t num_nodes = 0;
|
||||
for (auto it = tail_; it != nullptr; it = it->next_) {
|
||||
ss << "(" << it->key() << ", " << it->size().ToString()
|
||||
<< ", pins=" << it->pin_count_ << ")";
|
||||
num_nodes++;
|
||||
if (it->next_ != nullptr) {
|
||||
ss << " -> ";
|
||||
}
|
||||
}
|
||||
ss << "Total nodes: " << num_nodes << std::endl;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
ResourceUsage
|
||||
DList::tryEvict(const ResourceUsage& expected_eviction,
|
||||
const ResourceUsage& min_eviction) {
|
||||
std::vector<ListNode*> to_evict;
|
||||
@ -143,19 +223,33 @@ DList::tryEvict(const ResourceUsage& expected_eviction,
|
||||
actively_pinned += it->size();
|
||||
}
|
||||
}
|
||||
if (!size_to_evict.AnyGTZero()) {
|
||||
LOG_DEBUG(
|
||||
"[MCL] No items can be evicted, expected_eviction {}, min_eviction "
|
||||
"{}, giving up eviction. Current usage: {}",
|
||||
expected_eviction.ToString(),
|
||||
min_eviction.ToString(),
|
||||
usageInfo(actively_pinned));
|
||||
return ResourceUsage{0, 0};
|
||||
}
|
||||
if (!size_to_evict.CanHold(expected_eviction)) {
|
||||
if (!size_to_evict.CanHold(min_eviction)) {
|
||||
LOG_WARN(
|
||||
"Milvus Caching Layer: cannot evict even min_eviction {}, "
|
||||
"giving up eviction. Current usage: {}",
|
||||
LOG_INFO(
|
||||
"[MCL] Cannot evict even min_eviction {}, max possible "
|
||||
"eviction {}, giving up eviction. Current usage: {}. This may "
|
||||
"indicate a lack of resources.",
|
||||
min_eviction.ToString(),
|
||||
size_to_evict.ToString(),
|
||||
usageInfo(actively_pinned));
|
||||
return false;
|
||||
LOG_TRACE("[MCL] DList chain: {}", chainString());
|
||||
return ResourceUsage{0, 0};
|
||||
}
|
||||
LOG_INFO(
|
||||
"Milvus Caching Layer: cannot evict expected_eviction {}, "
|
||||
"evicting as much({}) as possible. Current usage: {}",
|
||||
LOG_DEBUG(
|
||||
"[MCL] cannot evict expected_eviction {} but can evict "
|
||||
"min_eviction {}, evicting as much({}) as possible. Current usage: "
|
||||
"{}",
|
||||
expected_eviction.ToString(),
|
||||
min_eviction.ToString(),
|
||||
size_to_evict.ToString(),
|
||||
usageInfo(actively_pinned));
|
||||
}
|
||||
@ -169,6 +263,8 @@ DList::tryEvict(const ResourceUsage& expected_eviction,
|
||||
used_memory_ -= size;
|
||||
}
|
||||
|
||||
LOG_TRACE("[MCL] Logically evicted size: {}", size_to_evict.ToString());
|
||||
|
||||
switch (size_to_evict.storage_type()) {
|
||||
case StorageType::MEMORY:
|
||||
milvus::monitor::internal_cache_evicted_bytes_memory.Increment(
|
||||
@ -187,14 +283,16 @@ DList::tryEvict(const ResourceUsage& expected_eviction,
|
||||
default:
|
||||
PanicInfo(ErrorCode::UnexpectedError, "Unknown StorageType");
|
||||
}
|
||||
return true;
|
||||
return size_to_evict;
|
||||
}
|
||||
|
||||
bool
|
||||
DList::UpdateLimit(const ResourceUsage& new_limit) {
|
||||
AssertInfo(new_limit.GEZero(),
|
||||
"Milvus Caching Layer: memory and disk usage limit must be "
|
||||
"greater than 0");
|
||||
AssertInfo((new_limit - high_watermark_).AllGEZero(),
|
||||
"[MCL] limit must be greater than high watermark. new_limit: "
|
||||
"{}, high_watermark: {}",
|
||||
new_limit.ToString(),
|
||||
high_watermark_.ToString());
|
||||
std::unique_lock<std::mutex> list_lock(list_mtx_);
|
||||
auto used = used_memory_.load();
|
||||
if (!new_limit.CanHold(used)) {
|
||||
@ -202,7 +300,7 @@ DList::UpdateLimit(const ResourceUsage& new_limit) {
|
||||
auto deficit = used - new_limit;
|
||||
// deficit is the hard limit of eviction, if we cannot evict deficit, we give
|
||||
// up the limit change.
|
||||
if (!tryEvict(deficit, deficit)) {
|
||||
if (!tryEvict(deficit, deficit).AnyGTZero()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -217,16 +315,32 @@ DList::UpdateLimit(const ResourceUsage& new_limit) {
|
||||
void
|
||||
DList::UpdateLowWatermark(const ResourceUsage& new_low_watermark) {
|
||||
std::unique_lock<std::mutex> list_lock(list_mtx_);
|
||||
AssertInfo(new_low_watermark.GEZero(),
|
||||
"Milvus Caching Layer: low watermark must be greater than 0");
|
||||
AssertInfo(new_low_watermark.AllGEZero(),
|
||||
"[MCL] low watermark must be greater than or "
|
||||
"equal to 0. new_low_watermark: {}",
|
||||
new_low_watermark.ToString());
|
||||
AssertInfo((high_watermark_ - new_low_watermark).AllGEZero(),
|
||||
"[MCL] low watermark must be less than or equal to high "
|
||||
"watermark. new_low_watermark: {}, high_watermark: {}",
|
||||
new_low_watermark.ToString(),
|
||||
high_watermark_.ToString());
|
||||
low_watermark_ = new_low_watermark;
|
||||
}
|
||||
|
||||
void
|
||||
DList::UpdateHighWatermark(const ResourceUsage& new_high_watermark) {
|
||||
std::unique_lock<std::mutex> list_lock(list_mtx_);
|
||||
AssertInfo(new_high_watermark.GEZero(),
|
||||
"Milvus Caching Layer: high watermark must be greater than 0");
|
||||
AssertInfo(
|
||||
(new_high_watermark - low_watermark_).AllGEZero(),
|
||||
"[MCL] high watermark must be greater than or "
|
||||
"equal to low watermark. new_high_watermark: {}, low_watermark: {}",
|
||||
new_high_watermark.ToString(),
|
||||
low_watermark_.ToString());
|
||||
AssertInfo((max_memory_ - new_high_watermark).AllGEZero(),
|
||||
"[MCL] high watermark must be less than or equal to max "
|
||||
"memory. new_high_watermark: {}, max_memory: {}",
|
||||
new_high_watermark.ToString(),
|
||||
max_memory_.ToString());
|
||||
high_watermark_ = new_high_watermark;
|
||||
}
|
||||
|
||||
@ -234,6 +348,8 @@ void
|
||||
DList::releaseMemory(const ResourceUsage& size) {
|
||||
// safe to substract on atomic without lock
|
||||
used_memory_ -= size;
|
||||
// this is called when a cell failed to load.
|
||||
loading_memory_ -= size * eviction_config_.loading_memory_factor;
|
||||
}
|
||||
|
||||
void
|
||||
@ -298,4 +414,44 @@ DList::IsEmpty() const {
|
||||
return head_ == nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
DList::addLoadingResource(const ResourceUsage& size) {
|
||||
loading_memory_ += size * eviction_config_.loading_memory_factor;
|
||||
}
|
||||
|
||||
void
|
||||
DList::removeLoadingResource(const ResourceUsage& size) {
|
||||
loading_memory_ -= size * eviction_config_.loading_memory_factor;
|
||||
}
|
||||
|
||||
int64_t
|
||||
DList::checkPhysicalMemoryLimit(const ResourceUsage& size) const {
|
||||
auto sys_mem = getSystemMemoryInfo();
|
||||
auto current_loading = loading_memory_.load();
|
||||
int64_t projected_usage = sys_mem.used_memory_bytes +
|
||||
current_loading.memory_bytes + size.memory_bytes;
|
||||
int64_t limit =
|
||||
static_cast<int64_t>(sys_mem.total_memory_bytes *
|
||||
eviction_config_.overloaded_memory_threshold_percentage);
|
||||
|
||||
int64_t eviction_needed =
|
||||
std::max(static_cast<int64_t>(0), projected_usage - limit);
|
||||
|
||||
LOG_TRACE(
|
||||
"[MCL] Physical memory check: "
|
||||
"projected_usage={}(used={}, loading={}, requesting={}), limit={} ({}% "
|
||||
"of {} "
|
||||
"total), eviction_needed={}",
|
||||
FormatBytes(projected_usage),
|
||||
FormatBytes(sys_mem.used_memory_bytes),
|
||||
FormatBytes(current_loading.memory_bytes),
|
||||
FormatBytes(size.memory_bytes),
|
||||
FormatBytes(limit),
|
||||
eviction_config_.overloaded_memory_threshold_percentage * 100,
|
||||
FormatBytes(sys_mem.total_memory_bytes),
|
||||
FormatBytes(eviction_needed));
|
||||
|
||||
return eviction_needed;
|
||||
}
|
||||
|
||||
} // namespace milvus::cachinglayer::internal
|
||||
|
||||
@ -32,6 +32,13 @@ class DList {
|
||||
low_watermark_(low_watermark),
|
||||
high_watermark_(high_watermark),
|
||||
eviction_config_(eviction_config) {
|
||||
AssertInfo(low_watermark.AllGEZero(),
|
||||
"[MCL] low watermark must be greater than or equal to 0");
|
||||
AssertInfo((high_watermark - low_watermark).AllGEZero(),
|
||||
"[MCL] high watermark must be greater than low watermark");
|
||||
AssertInfo((max_memory - high_watermark).AllGEZero(),
|
||||
"[MCL] max memory must be greater than high watermark");
|
||||
|
||||
eviction_thread_ = std::thread(&DList::evictionLoop, this);
|
||||
}
|
||||
|
||||
@ -84,6 +91,12 @@ class DList {
|
||||
void
|
||||
removeItem(ListNode* list_node, ResourceUsage size);
|
||||
|
||||
void
|
||||
addLoadingResource(const ResourceUsage& size);
|
||||
|
||||
void
|
||||
removeLoadingResource(const ResourceUsage& size);
|
||||
|
||||
const EvictionConfig&
|
||||
eviction_config() const {
|
||||
return eviction_config_;
|
||||
@ -99,7 +112,8 @@ class DList {
|
||||
// If we cannot achieve the goal, but we can evict min_eviction, we will still perform eviction.
|
||||
// If we cannot even evict min_eviction, nothing will be evicted and false will be returned.
|
||||
// Must be called under the lock of list_mtx_.
|
||||
bool
|
||||
// Returns the logical amount of resources that are evicted. 0 means no eviction happened.
|
||||
ResourceUsage
|
||||
tryEvict(const ResourceUsage& expected_eviction,
|
||||
const ResourceUsage& min_eviction);
|
||||
|
||||
@ -117,6 +131,16 @@ class DList {
|
||||
std::string
|
||||
usageInfo(const ResourceUsage& actively_pinned) const;
|
||||
|
||||
// Physical memory protection methods
|
||||
// Returns the amount of memory that needs to be evicted to satisfy physical memory limit
|
||||
// Returns 0 if no eviction needed, positive value if eviction needed.
|
||||
int64_t
|
||||
checkPhysicalMemoryLimit(const ResourceUsage& size) const;
|
||||
|
||||
// not thread safe, use for debug only
|
||||
std::string
|
||||
chainString() const;
|
||||
|
||||
// head_ is the most recently used item, tail_ is the least recently used item.
|
||||
// tail_ -> next -> ... -> head_
|
||||
// tail_ <- prev <- ... <- head_
|
||||
@ -127,6 +151,8 @@ class DList {
|
||||
mutable std::mutex list_mtx_;
|
||||
// access to used_memory_ and max_memory_ must be done under the lock of list_mtx_
|
||||
std::atomic<ResourceUsage> used_memory_{};
|
||||
// Track estimated resources currently being loaded
|
||||
std::atomic<ResourceUsage> loading_memory_{};
|
||||
ResourceUsage low_watermark_;
|
||||
ResourceUsage high_watermark_;
|
||||
ResourceUsage max_memory_;
|
||||
|
||||
@ -68,17 +68,17 @@ ListNode::~ListNode() {
|
||||
bool
|
||||
ListNode::manual_evict() {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
if (state_ == State::ERROR || state_ == State::LOADING) {
|
||||
LOG_ERROR("manual_evict() called on a {} cell",
|
||||
if (state_ == State::LOADING) {
|
||||
LOG_ERROR("[MCL] manual_evict() called on a {} cell",
|
||||
state_to_string(state_));
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
if (state_ == State::NOT_LOADED) {
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
if (pin_count_.load() > 0) {
|
||||
LOG_ERROR(
|
||||
"manual_evict() called on a LOADED and pinned cell, aborting "
|
||||
"[MCL] manual_evict() called on a LOADED and pinned cell, aborting "
|
||||
"eviction.");
|
||||
return false;
|
||||
}
|
||||
@ -90,8 +90,8 @@ ListNode::manual_evict() {
|
||||
return true;
|
||||
}
|
||||
|
||||
ResourceUsage&
|
||||
ListNode::size() {
|
||||
const ResourceUsage&
|
||||
ListNode::size() const {
|
||||
return size_;
|
||||
}
|
||||
|
||||
@ -102,10 +102,6 @@ ListNode::pin() {
|
||||
AssertInfo(state_ != State::NOT_LOADED,
|
||||
"Programming error: read_op called on a {} cell",
|
||||
state_to_string(state_));
|
||||
if (state_ == State::ERROR) {
|
||||
return std::make_pair(false,
|
||||
folly::makeSemiFuture<NodePin>(error_));
|
||||
}
|
||||
// pin the cell now so that we can avoid taking the lock again in deferValue.
|
||||
auto p = NodePin(this);
|
||||
if (state_ == State::LOADED) {
|
||||
@ -134,15 +130,6 @@ ListNode::pin() {
|
||||
internal::cache_op_result_count_miss(size_.storage_type()).Increment();
|
||||
load_promise_ = std::make_unique<folly::SharedPromise<folly::Unit>>();
|
||||
state_ = State::LOADING;
|
||||
if (dlist_ && !dlist_->reserveMemory(size())) {
|
||||
// if another thread sees LOADING status, the memory reservation has succeeded.
|
||||
state_ = State::ERROR;
|
||||
error_ = folly::make_exception_wrapper<std::runtime_error>(fmt::format(
|
||||
"Failed to load {} due to insufficient resource", key()));
|
||||
load_promise_->setException(error_);
|
||||
load_promise_ = nullptr;
|
||||
return std::make_pair(false, folly::makeSemiFuture<NodePin>(error_));
|
||||
}
|
||||
|
||||
// pin the cell now so that we can avoid taking the lock again in deferValue.
|
||||
auto p = NodePin(this);
|
||||
@ -155,22 +142,20 @@ ListNode::pin() {
|
||||
void
|
||||
ListNode::set_error(folly::exception_wrapper error) {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
AssertInfo(state_ != State::NOT_LOADED && state_ != State::ERROR,
|
||||
AssertInfo(state_ != State::NOT_LOADED,
|
||||
"Programming error: set_error() called on a {} cell",
|
||||
state_to_string(state_));
|
||||
// load failed, release the memory reservation.
|
||||
if (dlist_) {
|
||||
dlist_->releaseMemory(size());
|
||||
}
|
||||
// may be successfully loaded by another thread as a bonus, they will update used memory.
|
||||
if (state_ == State::LOADED) {
|
||||
return;
|
||||
}
|
||||
// else: state_ is LOADING
|
||||
state_ = State::ERROR;
|
||||
load_promise_->setException(error);
|
||||
load_promise_ = nullptr;
|
||||
error_ = std::move(error);
|
||||
// else: state_ is LOADING, reset to NOT_LOADED
|
||||
state_ = State::NOT_LOADED;
|
||||
// Notify waiting threads about the error
|
||||
if (load_promise_) {
|
||||
load_promise_->setException(error);
|
||||
load_promise_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
@ -182,8 +167,6 @@ ListNode::state_to_string(State state) {
|
||||
return "LOADING";
|
||||
case State::LOADED:
|
||||
return "LOADED";
|
||||
case State::ERROR:
|
||||
return "ERROR";
|
||||
}
|
||||
throw std::invalid_argument("Invalid state");
|
||||
}
|
||||
@ -191,14 +174,6 @@ ListNode::state_to_string(State state) {
|
||||
void
|
||||
ListNode::unpin() {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
AssertInfo(
|
||||
state_ == State::LOADED || state_ == State::ERROR,
|
||||
"Programming error: unpin() called on a {} cell, current pin_count {}",
|
||||
state_to_string(state_),
|
||||
pin_count_.load());
|
||||
if (state_ == State::ERROR) {
|
||||
return;
|
||||
}
|
||||
if (pin_count_.fetch_sub(1) == 1) {
|
||||
touch(false);
|
||||
}
|
||||
@ -227,6 +202,8 @@ ListNode::clear_data() {
|
||||
2 * dlist_->eviction_config().cache_touch_window;
|
||||
}
|
||||
unload();
|
||||
LOG_TRACE(
|
||||
"[MCL] ListNode evicted: key={}, size={}", key(), size_.ToString());
|
||||
state_ = State::NOT_LOADED;
|
||||
}
|
||||
|
||||
@ -235,4 +212,11 @@ ListNode::unload() {
|
||||
// Default implementation does nothing
|
||||
}
|
||||
|
||||
void
|
||||
ListNode::remove_self_from_loading_resource() {
|
||||
if (dlist_) {
|
||||
dlist_->removeLoadingResource(size_);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace milvus::cachinglayer::internal
|
||||
|
||||
@ -71,20 +71,19 @@ class ListNode {
|
||||
std::pair<bool, folly::SemiFuture<NodePin>>
|
||||
pin();
|
||||
|
||||
ResourceUsage&
|
||||
size();
|
||||
const ResourceUsage&
|
||||
size() const;
|
||||
|
||||
// Manually evicts the cell if it is not pinned.
|
||||
// Returns true if the cell ends up in a state other than LOADED.
|
||||
bool
|
||||
manual_evict();
|
||||
|
||||
// TODO(tiered storage 2): pin on ERROR should re-trigger loading.
|
||||
// NOT_LOADED ---> LOADING ---> ERROR
|
||||
// NOT_LOADED <---> LOADING
|
||||
// ^ |
|
||||
// | v
|
||||
// |------- LOADED
|
||||
enum class State { NOT_LOADED, LOADING, LOADED, ERROR };
|
||||
enum class State { NOT_LOADED, LOADING, LOADED };
|
||||
|
||||
protected:
|
||||
// will be called during eviction, implementation should release all resources.
|
||||
@ -99,10 +98,7 @@ class ListNode {
|
||||
mark_loaded(Fn&& cb, bool requesting_thread) {
|
||||
std::unique_lock<std::shared_mutex> lock(mtx_);
|
||||
if (requesting_thread) {
|
||||
// requesting thread will promote NOT_LOADED to LOADING and only requesting
|
||||
// thread will set state to ERROR, thus it is not possible for the requesting
|
||||
// thread to see NOT_LOADED or ERROR.
|
||||
AssertInfo(state_ != State::NOT_LOADED && state_ != State::ERROR,
|
||||
AssertInfo(state_ != State::NOT_LOADED,
|
||||
"Programming error: mark_loaded(requesting_thread=true) "
|
||||
"called on a {} cell",
|
||||
state_to_string(state_));
|
||||
@ -113,6 +109,7 @@ class ListNode {
|
||||
state_ = State::LOADED;
|
||||
load_promise_->setValue(folly::Unit());
|
||||
load_promise_ = nullptr;
|
||||
remove_self_from_loading_resource();
|
||||
} else {
|
||||
// LOADED: cell has been loaded by another thread, do nothing.
|
||||
return;
|
||||
@ -120,7 +117,7 @@ class ListNode {
|
||||
} else {
|
||||
// Even though this thread did not request loading this cell, translator still
|
||||
// decided to download it because the adjacent cells are requested.
|
||||
if (state_ == State::NOT_LOADED || state_ == State::ERROR) {
|
||||
if (state_ == State::NOT_LOADED) {
|
||||
state_ = State::LOADED;
|
||||
cb();
|
||||
// memory of this cell is not reserved, touch() to track it.
|
||||
@ -134,6 +131,7 @@ class ListNode {
|
||||
load_promise_ = nullptr;
|
||||
// the node that marked LOADING has already reserved memory, do not double count.
|
||||
touch(false);
|
||||
remove_self_from_loading_resource();
|
||||
} else {
|
||||
// LOADED: cell has been loaded by another thread, do nothing.
|
||||
return;
|
||||
@ -144,6 +142,9 @@ class ListNode {
|
||||
void
|
||||
set_error(folly::exception_wrapper error);
|
||||
|
||||
void
|
||||
remove_self_from_loading_resource();
|
||||
|
||||
State state_{State::NOT_LOADED};
|
||||
|
||||
static std::string
|
||||
@ -182,7 +183,6 @@ class ListNode {
|
||||
std::atomic<int> pin_count_{0};
|
||||
|
||||
std::unique_ptr<folly::SharedPromise<folly::Unit>> load_promise_{nullptr};
|
||||
folly::exception_wrapper error_;
|
||||
};
|
||||
|
||||
} // namespace milvus::cachinglayer::internal
|
||||
|
||||
@ -169,7 +169,10 @@ class StringChunk : public Chunk {
|
||||
std::string_view
|
||||
operator[](const int i) const {
|
||||
if (i < 0 || i >= row_nums_) {
|
||||
PanicInfo(ErrorCode::OutOfRange, "index out of range");
|
||||
PanicInfo(ErrorCode::OutOfRange,
|
||||
"index out of range {} at {}",
|
||||
i,
|
||||
row_nums_);
|
||||
}
|
||||
|
||||
return {data_ + offsets_[i], offsets_[i + 1] - offsets_[i]};
|
||||
|
||||
@ -75,6 +75,12 @@
|
||||
GetThreadName(), \
|
||||
milvus::tracer::GetTraceID())
|
||||
|
||||
// avoid evaluating args if trace log is not enabled
|
||||
#define LOG_TRACE(args...) \
|
||||
if (VLOG_IS_ON(GLOG_TRACE)) { \
|
||||
VLOG(GLOG_TRACE) << SERVER_MODULE_FUNCTION << fmt::format(args); \
|
||||
}
|
||||
|
||||
#define LOG_DEBUG(args...) \
|
||||
VLOG(GLOG_DEBUG) << SERVER_MODULE_FUNCTION << fmt::format(args)
|
||||
#define LOG_INFO(args...) \
|
||||
|
||||
@ -185,7 +185,10 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
|
||||
const int64_t disk_max_bytes,
|
||||
const bool evictionEnabled,
|
||||
const int64_t cache_touch_window_ms,
|
||||
const int64_t eviction_interval_ms) {
|
||||
const int64_t eviction_interval_ms,
|
||||
const float loading_memory_factor,
|
||||
const float overloaded_memory_threshold_percentage,
|
||||
const float max_disk_usage_percentage) {
|
||||
milvus::cachinglayer::Manager::ConfigureTieredStorage(
|
||||
{scalarFieldCacheWarmupPolicy,
|
||||
vectorFieldCacheWarmupPolicy,
|
||||
@ -198,7 +201,7 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
|
||||
disk_high_watermark_bytes,
|
||||
disk_max_bytes},
|
||||
evictionEnabled,
|
||||
{cache_touch_window_ms, eviction_interval_ms});
|
||||
{cache_touch_window_ms, eviction_interval_ms, overloaded_memory_threshold_percentage, max_disk_usage_percentage, loading_memory_factor});
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -99,7 +99,10 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
|
||||
const int64_t disk_max_bytes,
|
||||
const bool evictionEnabled,
|
||||
const int64_t cache_touch_window_ms,
|
||||
const int64_t eviction_interval_ms);
|
||||
const int64_t eviction_interval_ms,
|
||||
const float loading_memory_factor,
|
||||
const float overloaded_memory_threshold_percentage,
|
||||
const float max_disk_usage_percentage);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@ -516,9 +516,9 @@ TEST_F(CacheSlotTest, EvictionTest) {
|
||||
ResourceUsage new_limit = ResourceUsage(300, 0);
|
||||
ResourceUsage new_high_watermark = ResourceUsage(250, 0);
|
||||
ResourceUsage new_low_watermark = ResourceUsage(200, 0);
|
||||
EXPECT_TRUE(dlist_->UpdateLimit(new_limit));
|
||||
dlist_->UpdateHighWatermark(new_high_watermark);
|
||||
dlist_->UpdateLowWatermark(new_low_watermark);
|
||||
dlist_->UpdateHighWatermark(new_high_watermark);
|
||||
EXPECT_TRUE(dlist_->UpdateLimit(new_limit));
|
||||
EXPECT_EQ(DListTestFriend::get_max_memory(*dlist_), new_limit);
|
||||
|
||||
std::vector<cl_uid_t> uids_012 = {10, 20, 30};
|
||||
@ -580,9 +580,9 @@ TEST_P(CacheSlotConcurrentTest, ConcurrentAccessMultipleSlots) {
|
||||
ResourceUsage new_limit = ResourceUsage(700, 0);
|
||||
ResourceUsage new_high_watermark = ResourceUsage(650, 0);
|
||||
ResourceUsage new_low_watermark = ResourceUsage(600, 0);
|
||||
ASSERT_TRUE(dlist_->UpdateLimit(new_limit));
|
||||
dlist_->UpdateHighWatermark(new_high_watermark);
|
||||
dlist_->UpdateLowWatermark(new_low_watermark);
|
||||
dlist_->UpdateHighWatermark(new_high_watermark);
|
||||
ASSERT_TRUE(dlist_->UpdateLimit(new_limit));
|
||||
EXPECT_EQ(DListTestFriend::get_max_memory(*dlist_).memory_bytes,
|
||||
new_limit.memory_bytes);
|
||||
|
||||
|
||||
@ -104,6 +104,8 @@ TEST_F(DListTest, UpdateLimitDecreaseNoEviction) {
|
||||
ASSERT_EQ(get_used_memory(), current_usage);
|
||||
|
||||
ResourceUsage new_limit{50, 25};
|
||||
dlist->UpdateLowWatermark({40, 20});
|
||||
dlist->UpdateHighWatermark({50, 25});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), current_usage);
|
||||
@ -124,6 +126,8 @@ TEST_F(DListTest, UpdateLimitDecreaseWithEvictionLRU) {
|
||||
EXPECT_CALL(*node2, clear_data()).Times(0);
|
||||
|
||||
ResourceUsage new_limit{70, 40};
|
||||
dlist->UpdateLowWatermark({56, 32});
|
||||
dlist->UpdateHighWatermark({70, 40});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), usage_node2);
|
||||
@ -146,6 +150,8 @@ TEST_F(DListTest, UpdateLimitDecreaseWithEvictionMultiple) {
|
||||
EXPECT_CALL(*node3, clear_data()).Times(0);
|
||||
|
||||
ResourceUsage new_limit{40, 15};
|
||||
dlist->UpdateLowWatermark({32, 12});
|
||||
dlist->UpdateHighWatermark({40, 15});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), usage_node3);
|
||||
@ -164,6 +170,8 @@ TEST_F(DListTest, UpdateLimitSkipsPinned) {
|
||||
EXPECT_CALL(*node2, clear_data()).Times(1);
|
||||
|
||||
ResourceUsage new_limit{70, 40};
|
||||
dlist->UpdateLowWatermark({56, 32});
|
||||
dlist->UpdateHighWatermark({70, 40});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), usage_node1);
|
||||
@ -176,6 +184,8 @@ TEST_F(DListTest, UpdateLimitToZero) {
|
||||
EXPECT_CALL(*node1, clear_data()).Times(1);
|
||||
EXPECT_CALL(*node2, clear_data()).Times(1);
|
||||
|
||||
dlist->UpdateLowWatermark({0, 0});
|
||||
dlist->UpdateHighWatermark({1, 1});
|
||||
EXPECT_TRUE(dlist->UpdateLimit({1, 1}));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), ResourceUsage{});
|
||||
@ -585,6 +595,8 @@ TEST_F(DListTest, UpdateLimitIncreaseMemDecreaseDisk) {
|
||||
EXPECT_CALL(*node2, clear_data()).Times(0);
|
||||
|
||||
ResourceUsage new_limit{200, 35};
|
||||
dlist->UpdateLowWatermark({90, 34});
|
||||
dlist->UpdateHighWatermark({90, 34});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
|
||||
EXPECT_EQ(get_used_memory(), usage2);
|
||||
@ -604,6 +616,8 @@ TEST_F(DListTest, EvictedNodeDestroyed) {
|
||||
EXPECT_CALL(*node1, clear_data()).Times(1);
|
||||
EXPECT_CALL(*node2, clear_data()).Times(0);
|
||||
ResourceUsage new_limit{70, 40};
|
||||
dlist->UpdateLowWatermark({56, 32});
|
||||
dlist->UpdateHighWatermark({70, 40});
|
||||
EXPECT_TRUE(dlist->UpdateLimit(new_limit));
|
||||
DLF::verify_list(dlist.get(), {node2});
|
||||
ResourceUsage memory_after_eviction = get_used_memory();
|
||||
@ -677,8 +691,8 @@ TEST_F(DListTest, ReserveMemoryUsesLowWatermark) {
|
||||
low_watermark = {80, 80};
|
||||
high_watermark = {90, 90};
|
||||
EXPECT_TRUE(dlist->UpdateLimit(initial_limit));
|
||||
dlist->UpdateLowWatermark(low_watermark);
|
||||
dlist->UpdateHighWatermark(high_watermark);
|
||||
dlist->UpdateLowWatermark(low_watermark);
|
||||
|
||||
// Add nodes totaling 95/95 usage (above high watermark)
|
||||
MockListNode* node1 = add_and_load_node({45, 45}, "node1"); // Tail
|
||||
|
||||
@ -346,10 +346,10 @@ func (node *QueryNode) InitSegcore() error {
|
||||
|
||||
memoryLowWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredMemoryLowWatermarkRatio.GetAsFloat()
|
||||
memoryHighWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredMemoryHighWatermarkRatio.GetAsFloat()
|
||||
memoryMaxRatio := paramtable.Get().QueryNodeCfg.TieredMemoryMaxRatio.GetAsFloat()
|
||||
memoryMaxRatio := paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()
|
||||
diskLowWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredDiskLowWatermarkRatio.GetAsFloat()
|
||||
diskHighWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredDiskHighWatermarkRatio.GetAsFloat()
|
||||
diskMaxRatio := paramtable.Get().QueryNodeCfg.TieredDiskMaxRatio.GetAsFloat()
|
||||
diskMaxRatio := paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()
|
||||
|
||||
if memoryLowWatermarkRatio > memoryHighWatermarkRatio {
|
||||
return errors.New("memoryLowWatermarkRatio should not be greater than memoryHighWatermarkRatio")
|
||||
@ -382,6 +382,9 @@ func (node *QueryNode) InitSegcore() error {
|
||||
evictionEnabled := C.bool(paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool())
|
||||
cacheTouchWindowMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredCacheTouchWindowMs.GetAsInt64())
|
||||
evictionIntervalMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredEvictionIntervalMs.GetAsInt64())
|
||||
loadingMemoryFactor := C.float(paramtable.Get().QueryNodeCfg.TieredLoadingMemoryFactor.GetAsFloat())
|
||||
overloadedMemoryThresholdPercentage := C.float(memoryMaxRatio)
|
||||
maxDiskUsagePercentage := C.float(diskMaxRatio)
|
||||
|
||||
C.ConfigureTieredStorage(C.CacheWarmupPolicy(scalarFieldCacheWarmupPolicy),
|
||||
C.CacheWarmupPolicy(vectorFieldCacheWarmupPolicy),
|
||||
@ -389,7 +392,8 @@ func (node *QueryNode) InitSegcore() error {
|
||||
C.CacheWarmupPolicy(vectorIndexCacheWarmupPolicy),
|
||||
memoryLowWatermarkBytes, memoryHighWatermarkBytes, memoryMaxBytes,
|
||||
diskLowWatermarkBytes, diskHighWatermarkBytes, diskMaxBytes,
|
||||
evictionEnabled, cacheTouchWindowMs, evictionIntervalMs)
|
||||
evictionEnabled, cacheTouchWindowMs, evictionIntervalMs,
|
||||
loadingMemoryFactor, overloadedMemoryThresholdPercentage, maxDiskUsagePercentage)
|
||||
|
||||
err = initcore.InitInterminIndexConfig(paramtable.Get())
|
||||
if err != nil {
|
||||
|
||||
@ -2836,13 +2836,12 @@ type queryNodeConfig struct {
|
||||
TieredWarmupVectorIndex ParamItem `refreshable:"false"`
|
||||
TieredMemoryLowWatermarkRatio ParamItem `refreshable:"false"`
|
||||
TieredMemoryHighWatermarkRatio ParamItem `refreshable:"false"`
|
||||
TieredMemoryMaxRatio ParamItem `refreshable:"false"`
|
||||
TieredDiskLowWatermarkRatio ParamItem `refreshable:"false"`
|
||||
TieredDiskHighWatermarkRatio ParamItem `refreshable:"false"`
|
||||
TieredDiskMaxRatio ParamItem `refreshable:"false"`
|
||||
TieredEvictionEnabled ParamItem `refreshable:"false"`
|
||||
TieredCacheTouchWindowMs ParamItem `refreshable:"false"`
|
||||
TieredEvictionIntervalMs ParamItem `refreshable:"false"`
|
||||
TieredLoadingMemoryFactor ParamItem `refreshable:"false"`
|
||||
|
||||
KnowhereScoreConsistency ParamItem `refreshable:"false"`
|
||||
|
||||
@ -3061,9 +3060,9 @@ Note that if eviction is enabled, cache data loaded during sync warmup is also s
|
||||
},
|
||||
Doc: `If evictionEnabled is true, a background thread will run every evictionIntervalMs to determine if an
|
||||
eviction is necessary and the amount of data to evict from memory/disk.
|
||||
- The max ratio is the max amount of memory/disk that can be used for cache.
|
||||
- If the current memory/disk usage exceeds the high watermark, an eviction will be triggered to evict data from memory/disk
|
||||
until the memory/disk usage is below the low watermark.`,
|
||||
until the memory/disk usage is below the low watermark.
|
||||
- The max amount of memory/disk that can be used for cache is controlled by overloadedMemoryThresholdPercentage and diskMaxUsagePercentage.`,
|
||||
Export: true,
|
||||
}
|
||||
p.TieredMemoryLowWatermarkRatio.Init(base.mgr)
|
||||
@ -3083,21 +3082,6 @@ eviction is necessary and the amount of data to evict from memory/disk.
|
||||
}
|
||||
p.TieredMemoryHighWatermarkRatio.Init(base.mgr)
|
||||
|
||||
p.TieredMemoryMaxRatio = ParamItem{
|
||||
Key: "queryNode.segcore.tieredStorage.memoryMaxRatio",
|
||||
Version: "2.6.0",
|
||||
DefaultValue: "0.9",
|
||||
Formatter: func(v string) string {
|
||||
ratio := getAsFloat(v)
|
||||
if ratio < 0 || ratio > 1 {
|
||||
return "0.9"
|
||||
}
|
||||
return fmt.Sprintf("%f", ratio)
|
||||
},
|
||||
Export: true,
|
||||
}
|
||||
p.TieredMemoryMaxRatio.Init(base.mgr)
|
||||
|
||||
p.TieredDiskLowWatermarkRatio = ParamItem{
|
||||
Key: "queryNode.segcore.tieredStorage.diskLowWatermarkRatio",
|
||||
Version: "2.6.0",
|
||||
@ -3128,21 +3112,6 @@ eviction is necessary and the amount of data to evict from memory/disk.
|
||||
}
|
||||
p.TieredDiskHighWatermarkRatio.Init(base.mgr)
|
||||
|
||||
p.TieredDiskMaxRatio = ParamItem{
|
||||
Key: "queryNode.segcore.tieredStorage.diskMaxRatio",
|
||||
Version: "2.6.0",
|
||||
DefaultValue: "0.9",
|
||||
Formatter: func(v string) string {
|
||||
ratio := getAsFloat(v)
|
||||
if ratio < 0 || ratio > 1 {
|
||||
return "0.9"
|
||||
}
|
||||
return fmt.Sprintf("%f", ratio)
|
||||
},
|
||||
Export: true,
|
||||
}
|
||||
p.TieredDiskMaxRatio.Init(base.mgr)
|
||||
|
||||
p.TieredCacheTouchWindowMs = ParamItem{
|
||||
Key: "queryNode.segcore.tieredStorage.cacheTouchWindowMs",
|
||||
Version: "2.6.0",
|
||||
@ -3175,6 +3144,22 @@ eviction is necessary and the amount of data to evict from memory/disk.
|
||||
}
|
||||
p.TieredEvictionIntervalMs.Init(base.mgr)
|
||||
|
||||
p.TieredLoadingMemoryFactor = ParamItem{
|
||||
Key: "queryNode.segcore.tieredStorage.loadingMemoryFactor",
|
||||
Version: "2.6.0",
|
||||
DefaultValue: "2.5",
|
||||
Formatter: func(v string) string {
|
||||
factor := getAsFloat(v)
|
||||
if factor < 1.0 {
|
||||
return "2.5"
|
||||
}
|
||||
return fmt.Sprintf("%.2f", factor)
|
||||
},
|
||||
Doc: "Loading memory factor for estimating memory during loading.",
|
||||
Export: false,
|
||||
}
|
||||
p.TieredLoadingMemoryFactor.Init(base.mgr)
|
||||
|
||||
p.KnowhereThreadPoolSize = ParamItem{
|
||||
Key: "queryNode.segcore.knowhereThreadPoolNumRatio",
|
||||
Version: "2.0.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user