enhance: cachinglayer: resource management for segment loading (#43846)

issue: #41435

---------

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-08-29 11:37:50 +08:00 committed by GitHub
parent 7b04107863
commit 70c8114e85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
47 changed files with 688 additions and 351 deletions

View File

@ -32,6 +32,7 @@
#include "simdjson/common_defs.h"
#include "sys/mman.h"
#include "common/Types.h"
#include "cachinglayer/Utils.h"
namespace milvus {
constexpr uint64_t MMAP_STRING_PADDING = 1;
@ -65,9 +66,12 @@ class Chunk {
return size_;
}
size_t
cachinglayer::ResourceUsage
CellByteSize() const {
return size_;
if (mmap_file_raii_) {
return cachinglayer::ResourceUsage(0, static_cast<int64_t>(size_));
}
return cachinglayer::ResourceUsage(static_cast<int64_t>(size_), 0);
}
int64_t

View File

@ -70,9 +70,9 @@ class GroupChunk {
return chunks_;
}
size_t
cachinglayer::ResourceUsage
CellByteSize() const {
size_t total_size = 0;
cachinglayer::ResourceUsage total_size = {0, 0};
for (const auto& chunk : chunks_) {
total_size += chunk.second->CellByteSize();
}

View File

@ -62,7 +62,7 @@ CollectionIndexMeta::GetIndexMaxRowCount() const {
}
bool
CollectionIndexMeta::HasFiled(FieldId fieldId) const {
CollectionIndexMeta::HasField(FieldId fieldId) const {
return fieldMetas_.count(fieldId);
}

View File

@ -80,7 +80,7 @@ class CollectionIndexMeta {
GetIndexMaxRowCount() const;
bool
HasFiled(FieldId fieldId) const;
HasField(FieldId fieldId) const;
const FieldIndexMeta&
GetFieldIndexMeta(FieldId fieldId) const;

View File

@ -149,6 +149,12 @@ typedef struct CPluginContext {
int64_t collection_id;
const char* key;
} CPluginContext;
typedef struct CResourceUsage {
int64_t memory_bytes;
int64_t disk_bytes;
} CResourceUsage;
#ifdef __cplusplus
}

View File

@ -81,13 +81,13 @@ class IndexBase {
}
// TODO: how to get the cell byte size?
virtual size_t
virtual cachinglayer::ResourceUsage
CellByteSize() const {
return cell_size_;
}
virtual void
SetCellSize(size_t cell_size) {
SetCellSize(cachinglayer::ResourceUsage cell_size) {
cell_size_ = cell_size;
}
@ -97,7 +97,7 @@ class IndexBase {
}
IndexType index_type_ = "";
size_t cell_size_ = 0;
cachinglayer::ResourceUsage cell_size_ = {0, 0};
std::unique_ptr<MmapFileRAII> mmap_file_raii_;
};

View File

@ -61,9 +61,9 @@ struct FieldChunkMetrics {
return {lower_bound, upper_bound};
}
size_t
cachinglayer::ResourceUsage
CellByteSize() const {
return 0;
return {0, 0};
}
};
@ -91,11 +91,12 @@ class FieldChunkMetricsTranslator
cell_id_of(milvus::cachinglayer::uid_t uid) const override {
return uid;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const override {
// TODO(tiered storage 1): provide a better estimation.
return {0, 0};
return {{0, 0}, {0, 0}};
}
const std::string&
key() const override {

View File

@ -18,6 +18,7 @@
#include <algorithm>
#include <cstdint>
#include <ctime>
#include <memory>
#include <optional>
#include <string>
@ -563,7 +564,7 @@ ChunkedSegmentSealedImpl::num_rows_until_chunk(FieldId field_id,
bool
ChunkedSegmentSealedImpl::is_mmap_field(FieldId field_id) const {
std::shared_lock lck(mutex_);
return mmap_fields_.find(field_id) != mmap_fields_.end();
return mmap_field_ids_.find(field_id) != mmap_field_ids_.end();
}
PinWrapper<SpanBase>
@ -2151,7 +2152,7 @@ ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
bool
ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id,
int64_t num_rows) {
if (col_index_meta_ == nullptr || !col_index_meta_->HasFiled(field_id)) {
if (col_index_meta_ == nullptr || !col_index_meta_->HasField(field_id)) {
return false;
}
auto& field_meta = schema_->operator[](field_id);
@ -2313,7 +2314,7 @@ ChunkedSegmentSealedImpl::load_field_data_common(
!already_exists, "field {} column already exists", field_id.get());
fields_.wlock()->emplace(field_id, column);
if (enable_mmap) {
mmap_fields_.insert(field_id);
mmap_field_ids_.insert(field_id);
}
}
// system field only needs to emplace column to fields_ map
@ -2377,11 +2378,7 @@ ChunkedSegmentSealedImpl::init_timestamp_index(
// use special index
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
insert_record_.timestamps_.set_data_raw(
0, timestamps.data(), timestamps.size());
insert_record_.timestamp_index_ = std::move(index);
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
insert_record_.init_timestamps(timestamps, index);
stats_.mem_size += sizeof(Timestamp) * num_rows;
}

View File

@ -143,7 +143,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
GetNgramIndexForJson(FieldId field_id,
const std::string& nested_path) const override;
// TODO(tiered storage 1): should return a PinWrapper
void
BulkGetJsonData(FieldId field_id,
std::function<void(milvus::Json, size_t, bool)> fn,
@ -507,6 +506,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
// scalar field index
std::unordered_map<FieldId, index::CacheIndexBasePtr> scalar_indexings_;
// vector field index
SealedIndexingRecord vector_indexings_;
@ -523,7 +523,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
mutable folly::Synchronized<
std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnInterface>>>
fields_;
std::unordered_set<FieldId> mmap_fields_;
std::unordered_set<FieldId> mmap_field_ids_;
// only useful in binlog
IndexMetaPtr col_index_meta_;

View File

@ -67,6 +67,10 @@ class DeletedRecord {
}
~DeletedRecord() {
if constexpr (is_sealed) {
cachinglayer::Manager::GetInstance().RefundLoadedResource(
{estimated_memory_size_, 0});
}
}
DeletedRecord(DeletedRecord<is_sealed>&& delete_record) = delete;
@ -147,6 +151,25 @@ class DeletedRecord {
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
if constexpr (is_sealed) {
// update estimated memory size to caching layer only when the delta is large enough (64KB)
constexpr int64_t MIN_DELTA_SIZE = 64 * 1024;
auto new_estimated_size = size();
if (std::abs(new_estimated_size - estimated_memory_size_) >
MIN_DELTA_SIZE) {
auto delta_size = new_estimated_size - estimated_memory_size_;
if (delta_size >= 0) {
cachinglayer::Manager::GetInstance().ChargeLoadedResource(
{delta_size, 0});
} else {
cachinglayer::Manager::GetInstance().RefundLoadedResource(
{-delta_size, 0});
}
estimated_memory_size_ = new_estimated_size;
}
}
return max_timestamp;
}
@ -305,7 +328,8 @@ class DeletedRecord {
public:
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
InsertRecord<is_sealed>* insert_record_;
std::conditional_t<is_sealed, InsertRecordSealed, InsertRecordGrowing>*
insert_record_;
std::function<void(const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)>)>
@ -325,6 +349,8 @@ class DeletedRecord {
std::vector<SortedDeleteList::iterator> snap_next_iter_;
// total number of delete entries that have been incorporated into snapshots
std::atomic<int64_t> dumped_entry_count_{0};
// estimated memory size of DeletedRecord, only used for sealed segment
int64_t estimated_memory_size_{0};
};
} // namespace milvus::segcore

View File

@ -289,7 +289,7 @@ class IndexingRecord {
}
//Small-Index enabled, create index for vector field only
if (index_meta_->GetIndexMaxRowCount() > 0 &&
index_meta_->HasFiled(field_id)) {
index_meta_->HasField(field_id)) {
auto vec_field_meta =
index_meta_->GetFieldIndexMeta(field_id);
//Disable growing index for flat and embedding list

View File

@ -24,11 +24,13 @@
#include "TimestampIndex.h"
#include "common/EasyAssert.h"
#include "common/Schema.h"
#include "common/TrackingStdAllocator.h"
#include "common/Types.h"
#include "log/Log.h"
#include "mmap/ChunkedColumn.h"
#include "segcore/AckResponder.h"
#include "segcore/ConcurrentVector.h"
#include <type_traits>
namespace milvus::segcore {
@ -72,11 +74,20 @@ class OffsetMap {
virtual void
clear() = 0;
virtual size_t
size() const = 0;
};
template <typename T>
class OffsetOrderedMap : public OffsetMap {
public:
using OrderedMap = std::map<
T,
std::vector<int64_t>,
std::less<>,
TrackingStdAllocator<std::pair<const T, std::vector<int64_t>>>>;
bool
contain(const PkType& pk) const override {
std::shared_lock<std::shared_mutex> lck(mtx_);
@ -192,6 +203,12 @@ class OffsetOrderedMap : public OffsetMap {
map_.clear();
}
size_t
size() const override {
std::shared_lock<std::shared_mutex> lck(mtx_);
return map_.get_allocator().total_allocated();
}
private:
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
@ -224,7 +241,6 @@ class OffsetOrderedMap : public OffsetMap {
}
private:
using OrderedMap = std::map<T, std::vector<int64_t>, std::less<>>;
OrderedMap map_;
mutable std::shared_mutex mtx_;
};
@ -367,6 +383,11 @@ class OffsetOrderedArray : public OffsetMap {
is_sealed = false;
}
size_t
size() const override {
return sizeof(std::pair<T, int32_t>) * array_.capacity();
}
private:
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_by_index(int64_t limit, const BitsetType& bitset) const {
@ -404,24 +425,20 @@ class OffsetOrderedArray : public OffsetMap {
std::vector<std::pair<T, int32_t>> array_;
};
template <bool is_sealed>
struct InsertRecord {
class InsertRecordSealed {
public:
InsertRecord(
const Schema& schema,
InsertRecordSealed(const Schema& schema,
const int64_t size_per_chunk,
const storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr,
bool called_from_subclass = false)
const storage::MmapChunkDescriptorPtr
/* mmap_descriptor */
= nullptr)
: timestamps_(size_per_chunk) {
if (called_from_subclass) {
return;
}
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
// for sealed segment, only pk field is added.
for (auto& field : schema) {
auto field_id = field.first;
auto& field_meta = field.second;
if (pk_field_id.has_value() && pk_field_id.value() == field_id) {
auto& field_meta = field.second;
AssertInfo(!field_meta.is_nullable(),
"Primary key should not be nullable");
switch (field_meta.get_data_type()) {
@ -531,6 +548,25 @@ struct InsertRecord {
seal_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_->seal();
// update estimated memory size to caching layer
cachinglayer::Manager::GetInstance().ChargeLoadedResource(
{static_cast<int64_t>(pk2offset_->size()), 0});
estimated_memory_size_ += pk2offset_->size();
}
void
init_timestamps(const std::vector<Timestamp>& timestamps,
const TimestampIndex& timestamp_index) {
std::lock_guard lck(shared_mutex_);
timestamps_.set_data_raw(0, timestamps.data(), timestamps.size());
timestamp_index_ = std::move(timestamp_index);
AssertInfo(timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
size_t size =
timestamps.size() * sizeof(Timestamp) + timestamp_index_.size();
cachinglayer::Manager::GetInstance().ChargeLoadedResource(
{static_cast<int64_t>(size), 0});
estimated_memory_size_ += size;
}
const ConcurrentVector<Timestamp>&
@ -538,44 +574,38 @@ struct InsertRecord {
return timestamps_;
}
virtual void
void
clear() {
timestamps_.clear();
timestamp_index_ = TimestampIndex();
pk2offset_->clear();
reserved = 0;
}
size_t
CellByteSize() const {
return 0;
cachinglayer::Manager::GetInstance().RefundLoadedResource(
{static_cast<int64_t>(estimated_memory_size_), 0});
estimated_memory_size_ = 0;
}
public:
ConcurrentVector<Timestamp> timestamps_;
std::atomic<int64_t> reserved = 0;
// used for timestamps index of sealed segment
TimestampIndex timestamp_index_;
// pks to row offset
std::unique_ptr<OffsetMap> pk2offset_;
// estimated memory size of InsertRecord, only used for sealed segment
int64_t estimated_memory_size_{0};
protected:
storage::MmapChunkDescriptorPtr mmap_descriptor_;
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> data_{};
mutable std::shared_mutex shared_mutex_{};
};
template <>
struct InsertRecord<false> : public InsertRecord<true> {
class InsertRecordGrowing {
public:
InsertRecord(
InsertRecordGrowing(
const Schema& schema,
const int64_t size_per_chunk,
const storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: InsertRecord<true>(schema, size_per_chunk, mmap_descriptor, true) {
: timestamps_(size_per_chunk) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
for (auto& field : schema) {
auto field_id = field.first;
@ -596,7 +626,7 @@ struct InsertRecord<false> : public InsertRecord<true> {
}
default: {
ThrowInfo(DataTypeInvalid,
fmt::format("unsupported pk type",
fmt::format("unsupported pk type: {}",
field_meta.get_data_type()));
}
}
@ -606,6 +636,43 @@ struct InsertRecord<false> : public InsertRecord<true> {
}
}
bool
contain(const PkType& pk) const {
return pk2offset_->contain(pk);
}
std::vector<SegOffset>
search_pk(const PkType& pk,
Timestamp timestamp,
bool include_same_ts = true) const {
std::shared_lock<std::shared_mutex> lck(shared_mutex_);
std::vector<SegOffset> res_offsets;
auto offset_iter = pk2offset_->find(pk);
auto timestamp_hit =
include_same_ts ? [](const Timestamp& ts1,
const Timestamp& ts2) { return ts1 <= ts2; }
: [](const Timestamp& ts1, const Timestamp& ts2) {
return ts1 < ts2;
};
for (auto offset : offset_iter) {
if (timestamp_hit(timestamps_[offset], timestamp)) {
res_offsets.emplace_back(offset);
}
}
return res_offsets;
}
void
search_pk_range(const PkType& pk,
Timestamp timestamp,
proto::plan::OpType op,
BitsetTypeView& bitset) const {
auto condition = [this, timestamp](int64_t offset) {
return timestamps_[offset] <= timestamp;
};
pk2offset_->find_range(pk, op, bitset, condition);
}
void
insert_pks(const std::vector<FieldDataPtr>& field_datas) {
std::lock_guard lck(shared_mutex_);
@ -639,6 +706,34 @@ struct InsertRecord<false> : public InsertRecord<true> {
}
}
bool
empty_pks() const {
std::shared_lock lck(shared_mutex_);
return pk2offset_->empty();
}
void
seal_pks() {
std::lock_guard lck(shared_mutex_);
pk2offset_
->seal(); // will throw for growing map, consistent with previous behavior
}
const ConcurrentVector<Timestamp>&
timestamps() const {
return timestamps_;
}
void
clear() {
timestamps_.clear();
timestamp_index_ = TimestampIndex();
pk2offset_->clear();
reserved = 0;
data_.clear();
ack_responder_.clear();
}
void
append_field_meta(
FieldId field_id,
@ -912,20 +1007,25 @@ struct InsertRecord<false> : public InsertRecord<true> {
empty() const {
return pk2offset_->empty();
}
void
clear() override {
InsertRecord<true>::clear();
data_.clear();
ack_responder_.clear();
}
public:
ConcurrentVector<Timestamp> timestamps_;
std::atomic<int64_t> reserved = 0;
TimestampIndex timestamp_index_;
std::unique_ptr<OffsetMap> pk2offset_;
// used for preInsert of growing segment
AckResponder ack_responder_;
private:
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> data_{};
std::unordered_map<FieldId, ThreadSafeValidDataPtr> valid_data_{};
mutable std::shared_mutex shared_mutex_{};
};
// Keep the original template API via alias
template <bool is_sealed>
using InsertRecord =
std::conditional_t<is_sealed, InsertRecordSealed, InsertRecordGrowing>;
} // namespace milvus::segcore

View File

@ -45,6 +45,13 @@ class TimestampIndex {
Timestamp expire_ts,
std::pair<int64_t, int64_t> active_range);
size_t
size() const {
return sizeof(*this) + lengths_.size() * sizeof(int64_t) +
start_locs_.size() * sizeof(int64_t) +
timestamp_barriers_.size() * sizeof(Timestamp);
}
private:
// numSlice
std::vector<int64_t> lengths_;

View File

@ -243,6 +243,23 @@ EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info) {
}
}
bool
TryReserveLoadingResourceWithTimeout(CResourceUsage size,
int64_t millisecond_timeout) {
return milvus::cachinglayer::Manager::GetInstance()
.ReserveLoadingResourceWithTimeout(
milvus::cachinglayer::ResourceUsage(size.memory_bytes,
size.disk_bytes),
std::chrono::milliseconds(millisecond_timeout));
}
void
ReleaseLoadingResource(CResourceUsage size) {
milvus::cachinglayer::Manager::GetInstance().ReleaseLoadingResource(
milvus::cachinglayer::ResourceUsage(size.memory_bytes,
size.disk_bytes));
}
CStatus
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();

View File

@ -41,6 +41,13 @@ AppendIndexParam(CLoadIndexInfo c_load_index_info,
LoadResourceRequest
EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info);
bool
TryReserveLoadingResourceWithTimeout(CResourceUsage size,
int64_t millisecond_timeout);
void
ReleaseLoadingResource(CResourceUsage size);
CStatus
AppendIndexInfo(CLoadIndexInfo c_load_index_info,
int64_t index_id,

View File

@ -200,7 +200,7 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
const int64_t eviction_interval_ms,
const int64_t cache_cell_unaccessed_survival_time,
const float overloaded_memory_threshold_percentage,
const float loading_memory_factor,
const float loading_resource_factor,
const float max_disk_usage_percentage,
const char* disk_path) {
std::string disk_path_str(disk_path);
@ -222,7 +222,7 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
overloaded_memory_threshold_percentage,
max_disk_usage_percentage,
disk_path_str,
loading_memory_factor});
loading_resource_factor});
}
} // namespace milvus::segcore

View File

@ -113,7 +113,7 @@ ConfigureTieredStorage(
const int64_t eviction_interval_ms,
const int64_t cache_cell_unaccessed_survival_time,
const float overloaded_memory_threshold_percentage,
const float loading_memory_factor,
const float loading_resource_factor,
const float max_disk_usage_percentage,
const char* disk_path);

View File

@ -122,7 +122,8 @@ ChunkTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
return uid;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
ChunkTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
AssertInfo(cid < file_infos_.size(), "cid out of range");
@ -130,10 +131,10 @@ ChunkTranslator::estimated_byte_size_of_cell(
int64_t memory_size = file_infos_[cid].memory_size;
if (use_mmap_) {
// For mmap, the memory is counted as disk usage
return {0, memory_size};
return {{0, memory_size}, {memory_size * 2, memory_size}};
} else {
// For non-mmap, the memory is counted as memory usage
return {memory_size, 0};
return {{memory_size, 0}, {memory_size * 2, 0}};
}
}

View File

@ -68,7 +68,8 @@ class ChunkTranslator : public milvus::cachinglayer::Translator<milvus::Chunk> {
num_cells() const override;
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&
key() const override;

View File

@ -12,6 +12,7 @@
#include "segcore/storagev1translator/DefaultValueChunkTranslator.h"
#include "common/ChunkWriter.h"
#include "common/Types.h"
#include "segcore/Utils.h"
#include "storage/Util.h"
@ -56,11 +57,57 @@ DefaultValueChunkTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
return 0;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
DefaultValueChunkTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
// TODO(tiered storage 1): provide a better estimation.
return milvus::cachinglayer::ResourceUsage{0, 0};
int64_t value_size = 0;
switch (field_meta_.get_data_type()) {
case milvus::DataType::BOOL:
value_size = sizeof(bool);
break;
case milvus::DataType::INT8:
value_size = sizeof(int8_t);
break;
case milvus::DataType::INT16:
value_size = sizeof(int16_t);
break;
case milvus::DataType::INT32:
value_size = sizeof(int32_t);
break;
case milvus::DataType::INT64:
value_size = sizeof(int64_t);
break;
case milvus::DataType::FLOAT:
value_size = sizeof(float);
break;
case milvus::DataType::DOUBLE:
value_size = sizeof(double);
break;
case milvus::DataType::VARCHAR:
case milvus::DataType::STRING:
case milvus::DataType::TEXT:
if (field_meta_.default_value().has_value()) {
auto default_value = field_meta_.default_value().value();
value_size = default_value.string_data().size() +
1; // +1 for null terminator
} else {
value_size = 1; // 1 for null
}
break;
case milvus::DataType::JSON:
value_size = sizeof(Json);
break;
case milvus::DataType::ARRAY:
value_size = sizeof(Array);
break;
default:
ThrowInfo(DataTypeInvalid,
"unsupported default value data type {}",
field_meta_.get_data_type());
}
return {{value_size * meta_.num_rows_until_chunk_[1], 0},
{2 * value_size * meta_.num_rows_until_chunk_[1], 0}};
}
const std::string&

View File

@ -38,7 +38,8 @@ class DefaultValueChunkTranslator
num_cells() const override;
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&
key() const override;

View File

@ -41,7 +41,8 @@ InterimSealedIndexTranslator::cell_id_of(
return 0;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
InterimSealedIndexTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
auto size = vec_data_->DataByteSize();
@ -62,15 +63,17 @@ InterimSealedIndexTranslator::estimated_byte_size_of_cell(
knowhere::RefineType::BFLOAT16_QUANT) {
vec_size += dim_ * 2;
} // else knowhere::RefineType::DATA_VIEW, no extra size
return milvus::cachinglayer::ResourceUsage{vec_size * row_count, 0};
return {{vec_size * row_count, 0},
{static_cast<int64_t>(vec_size * row_count + size * 0.5), 0}};
} else if (index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC) {
// fp16/bf16 all use float32 to build index
return milvus::cachinglayer::ResourceUsage{
row_count * sizeof(float) * dim_, 0};
auto fp32_size = row_count * sizeof(float) * dim_;
return {{fp32_size, 0},
{static_cast<int64_t>(fp32_size + fp32_size * 0.5), 0}};
} else {
// SPARSE_WAND_CC and SPARSE_INVERTED_INDEX_CC basically has the same size as the
// raw data.
return milvus::cachinglayer::ResourceUsage{size, 0};
return {{size, 0}, {static_cast<int64_t>(size * 2.0), 0}};
}
}

View File

@ -33,7 +33,8 @@ class InterimSealedIndexTranslator
num_cells() const override;
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&
key() const override;

View File

@ -49,7 +49,8 @@ SealedIndexTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
return 0;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
SealedIndexTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
LoadResourceRequest request =
@ -60,10 +61,13 @@ SealedIndexTranslator::estimated_byte_size_of_cell(
index_load_info_.index_size,
index_load_info_.index_params,
index_load_info_.enable_mmap);
// TODO(tiered storage 1), this is an estimation, error could be up to 20%.
int64_t memory_cost = request.final_memory_cost * 1024 * 1024 * 1024;
int64_t disk_cost = request.final_disk_cost * 1024 * 1024 * 1024;
return milvus::cachinglayer::ResourceUsage{memory_cost, disk_cost};
// this is an estimation, error could be up to 20%.
int64_t final_memory_cost = request.final_memory_cost * 1024 * 1024 * 1024;
int64_t final_disk_cost = request.final_disk_cost * 1024 * 1024 * 1024;
int64_t max_memory_cost = request.max_memory_cost * 1024 * 1024 * 1024;
int64_t max_disk_cost = request.max_disk_cost * 1024 * 1024 * 1024;
return {{final_memory_cost, final_disk_cost},
{max_memory_cost, max_disk_cost}};
}
const std::string&
@ -77,7 +81,16 @@ SealedIndexTranslator::get_cells(const std::vector<cid_t>& cids) {
std::unique_ptr<milvus::index::IndexBase> index =
milvus::index::IndexFactory::GetInstance().CreateIndex(
index_info_, file_manager_context_);
index->SetCellSize(index_load_info_.index_size);
LoadResourceRequest request =
milvus::index::IndexFactory::GetInstance().IndexLoadResource(
index_load_info_.field_type,
index_load_info_.element_type,
index_load_info_.index_engine_version,
index_load_info_.index_size,
index_load_info_.index_params,
index_load_info_.enable_mmap);
index->SetCellSize({request.final_memory_cost * 1024 * 1024 * 1024,
request.final_disk_cost * 1024 * 1024 * 1024});
if (index_load_info_.enable_mmap && index->IsMmapSupported()) {
AssertInfo(!index_load_info_.mmap_dir_path.empty(),
"mmap directory path is empty");

View File

@ -30,7 +30,8 @@ class SealedIndexTranslator
num_cells() const override;
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&
key() const override;

View File

@ -50,10 +50,11 @@ V1SealedIndexTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
return 0;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
V1SealedIndexTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
return {0, 0};
return {{0, 0}, {0, 0}};
}
const std::string&
@ -123,7 +124,11 @@ V1SealedIndexTranslator::LoadVecIndex() {
auto index = milvus::index::IndexFactory::GetInstance().CreateIndex(
index_info, fileManagerContext);
index->SetCellSize(index_load_info_.index_size);
if (!index_load_info_.enable_mmap) {
index->SetCellSize({index_load_info_.index_size, 0});
} else {
index->SetCellSize({0, index_load_info_.index_size});
}
index->Load(*binary_set_, config);
return index;
} catch (std::exception& e) {
@ -162,7 +167,11 @@ V1SealedIndexTranslator::LoadScalarIndex() {
auto index = milvus::index::IndexFactory::GetInstance().CreateIndex(
index_info, milvus::storage::FileManagerContext());
index->SetCellSize(index_load_info_.index_size);
if (!index_load_info_.enable_mmap) {
index->SetCellSize({index_load_info_.index_size, 0});
} else {
index->SetCellSize({0, index_load_info_.index_size});
}
index->Load(*binary_set_);
return index;
} catch (std::exception& e) {

View File

@ -29,7 +29,8 @@ class V1SealedIndexTranslator : public Translator<milvus::index::IndexBase> {
num_cells() const override;
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&
key() const override;

View File

@ -137,13 +137,20 @@ GroupChunkTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
return uid;
}
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
GroupChunkTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
auto [file_idx, row_group_idx] = get_file_and_row_group_index(cid);
auto& row_group_meta = row_group_meta_list_[file_idx].Get(row_group_idx);
// TODO(tiered storage 1): should take into consideration of mmap or not.
return {static_cast<int64_t>(row_group_meta.memory_size()), 0};
auto cell_sz = static_cast<int64_t>(row_group_meta.memory_size());
if (use_mmap_) {
return {{0, cell_sz}, {2 * cell_sz, cell_sz}};
} else {
return {{cell_sz, 0}, {2 * cell_sz, 0}};
}
}
const std::string&

View File

@ -52,7 +52,8 @@ class GroupChunkTranslator
milvus::cachinglayer::cid_t
cell_id_of(milvus::cachinglayer::uid_t uid) const override;
milvus::cachinglayer::ResourceUsage
std::pair<milvus::cachinglayer::ResourceUsage,
milvus::cachinglayer::ResourceUsage>
estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override;
const std::string&

View File

@ -13,7 +13,7 @@
milvus_add_pkg_config("milvus-common")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( MILVUS-COMMON-VERSION 5770e40 )
set( MILVUS-COMMON-VERSION 7309380 )
set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git")
message(STATUS "milvus-common repo: ${GIT_REPOSITORY}")

View File

@ -192,16 +192,16 @@ TEST_F(ChunkedColumnGroupTest, GroupChunk) {
EXPECT_EQ(group_chunk->Size(), expected_size);
// Cell byte size
uint64_t expected_cell_size = int64_chunk->CellByteSize() +
string_chunk->CellByteSize() +
new_int64_chunk->CellByteSize();
EXPECT_EQ(group_chunk->CellByteSize(), expected_cell_size);
uint64_t expected_cell_size = int64_chunk->CellByteSize().memory_bytes +
string_chunk->CellByteSize().memory_bytes +
new_int64_chunk->CellByteSize().memory_bytes;
EXPECT_EQ(group_chunk->CellByteSize().memory_bytes, expected_cell_size);
// Test empty group chunk
auto empty_group_chunk = std::make_unique<GroupChunk>();
EXPECT_EQ(empty_group_chunk->RowNums(), 0);
EXPECT_EQ(empty_group_chunk->Size(), 0);
EXPECT_EQ(empty_group_chunk->CellByteSize(), 0);
EXPECT_EQ(empty_group_chunk->CellByteSize().memory_bytes, 0);
}
TEST_F(ChunkedColumnGroupTest, ChunkedColumnGroup) {

View File

@ -133,11 +133,15 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
for (size_t i = 0; i < translator->num_cells(); ++i) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_index(i);
// Get the expected memory size from the file directly
auto expected_memory_size = static_cast<int64_t>(
// Get the expected size from the file directly
auto expected_size = static_cast<int64_t>(
row_group_metadata_vector.Get(row_group_idx).memory_size());
auto usage = translator->estimated_byte_size_of_cell(i);
EXPECT_EQ(usage.memory_bytes, expected_memory_size);
auto usage = translator->estimated_byte_size_of_cell(i).first;
if (use_mmap) {
EXPECT_EQ(usage.file_bytes, expected_size);
} else {
EXPECT_EQ(usage.memory_bytes, expected_size);
}
}
// getting cells
@ -281,19 +285,23 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
for (size_t i = 0; i < translator->num_cells(); ++i) {
auto [file_idx, row_group_idx] =
translator->get_file_and_row_group_index(i);
auto usage = translator->estimated_byte_size_of_cell(i);
auto usage = translator->estimated_byte_size_of_cell(i).first;
// Get the expected memory size from the corresponding file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, multi_file_paths[file_idx]);
auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector();
auto expected_memory_size = static_cast<int64_t>(
auto expected_size = static_cast<int64_t>(
row_group_metadata_vector.Get(row_group_idx).memory_size());
auto status = fr->Close();
AssertInfo(status.ok(), "failed to close file reader");
EXPECT_EQ(usage.memory_bytes, expected_memory_size);
if (use_mmap) {
EXPECT_EQ(usage.file_bytes, expected_size);
} else {
EXPECT_EQ(usage.memory_bytes, expected_size);
}
}
// Clean up test files

View File

@ -71,9 +71,9 @@ class TestChunkTranslator : public Translator<milvus::Chunk> {
return uid;
}
ResourceUsage
std::pair<ResourceUsage, ResourceUsage>
estimated_byte_size_of_cell(cid_t cid) const override {
return ResourceUsage(0, 0);
return {{0, 0}, {0, 0}};
}
const std::string&
@ -141,9 +141,9 @@ class TestGroupChunkTranslator : public Translator<milvus::GroupChunk> {
return uid;
}
ResourceUsage
std::pair<ResourceUsage, ResourceUsage>
estimated_byte_size_of_cell(cid_t cid) const override {
return {0, 0};
return {{0, 0}, {0, 0}};
}
const std::string&
@ -200,9 +200,9 @@ class TestIndexTranslator : public Translator<milvus::index::IndexBase> {
return uid;
}
ResourceUsage
std::pair<ResourceUsage, ResourceUsage>
estimated_byte_size_of_cell(cid_t cid) const override {
return ResourceUsage(0, 0);
return {{0, 0}, {0, 0}};
}
const std::string&

View File

@ -87,6 +87,7 @@ func (s *DelegatorDataSuite) SetupSuite() {
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get(), 1)
initcore.InitTieredStorage(paramtable.Get())
s.collectionID = 1000
s.replicaID = 65535

View File

@ -254,16 +254,21 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
usedMem := hardware.GetUsedMemoryCount()
totalMem := hardware.GetMemoryCount()
quotaMetrics, err := getQuotaMetrics(node)
if err != nil {
return "", err
// TieredEvictionEnabled is enabled, use the logical resource memory size
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
usedMem = node.manager.Segment.GetLogicalResource().MemorySize
}
used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
usedDiskGB, totalDiskGB, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err))
}
// TieredEvictionEnabled is enabled, use the logical resource disk size
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
usedDiskGB = float64(node.manager.Segment.GetLogicalResource().DiskSize) / 1e9
}
ioWait, err := hardware.GetIOWait()
if err != nil {
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
@ -275,10 +280,16 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
CPUCoreUsage: hardware.GetCPUUsage(),
Memory: totalMem,
MemoryUsage: usedMem,
Disk: total,
DiskUsage: used,
Disk: totalDiskGB,
DiskUsage: usedDiskGB,
IOWaitPercentage: ioWait,
}
quotaMetrics, err := getQuotaMetrics(node)
if err != nil {
return "", err
}
quotaMetrics.Hms = hardwareInfos
collectionMetrics, err := getCollectionMetrics(node)

View File

@ -44,6 +44,7 @@ func (s *ManagerSuite) SetupSuite() {
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get(), 1)
initcore.InitTieredStorage(paramtable.Get())
}
func (s *ManagerSuite) SetupTest() {

View File

@ -59,6 +59,7 @@ func (suite *SearchSuite) SetupTest() {
initcore.InitRemoteChunkManager(paramtable.Get())
initcore.InitLocalChunkManager(suite.T().Name())
initcore.InitMmapManager(paramtable.Get(), 1)
initcore.InitTieredStorage(paramtable.Get())
suite.collectionID = 100
suite.partitionID = 10

View File

@ -240,7 +240,7 @@ func (s *baseSegment) ResourceUsageEstimate() ResourceUsage {
return *cache
}
usage, err := getLogicalResourceUsageEstimateOfSegment(s.collection.Schema(), s.LoadInfo(), resourceEstimateFactor{
usage, err := estimateLogicalResourceUsageOfSegment(s.collection.Schema(), s.LoadInfo(), resourceEstimateFactor{
deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(),
TieredEvictionEnabled: paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool(),
TieredEvictableMemoryCacheRatio: paramtable.Get().QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat(),

View File

@ -495,15 +495,18 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
}
result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos))
mu, du, err := loader.checkSegmentSize(ctx, infos, totalMemory, physicalMemoryUsage, physicalDiskUsage)
// check logical resource first
lmu, ldu, err := loader.checkLogicalSegmentSize(ctx, infos, totalMemory)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
log.Warn("no sufficient logical resource to load segments", zap.Error(err))
return result, err
}
lmu, ldu, err := loader.checkLogicalSegmentSize(ctx, infos, totalMemory)
// then get physical resource usage for loading segments
mu, du, err := loader.checkSegmentSize(ctx, infos, totalMemory, physicalMemoryUsage, physicalDiskUsage)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
log.Warn("no sufficient physical resource to load segments", zap.Error(err))
return result, err
}
@ -529,6 +532,13 @@ func (loader *segmentLoader) freeRequest(resource LoadResource, logicalResource
loader.mut.Lock()
defer loader.mut.Unlock()
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
C.ReleaseLoadingResource(C.CResourceUsage{
memory_bytes: C.int64_t(resource.MemorySize),
disk_bytes: C.int64_t(resource.DiskSize),
})
}
loader.committedResource.Sub(resource)
loader.committedLogicalResource.Sub(logicalResource)
loader.committedResourceNotifier.NotifyAll()
@ -1495,7 +1505,7 @@ func (loader *segmentLoader) checkLogicalSegmentSize(ctx context.Context, segmen
predictLogicalDiskUsage := logicalDiskUsage
for _, loadInfo := range segmentLoadInfos {
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
finalUsage, err := getLogicalResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, finalFactor)
finalUsage, err := estimateLogicalResourceUsageOfSegment(collection.Schema(), loadInfo, finalFactor)
if err != nil {
log.Warn(
"failed to estimate final resource usage of segment",
@ -1519,18 +1529,23 @@ func (loader *segmentLoader) checkLogicalSegmentSize(ctx context.Context, segmen
zap.Float64("predictLogicalDiskUsage(MB)", logutil.ToMB(float64(predictLogicalDiskUsage))),
)
if predictLogicalMemUsage > uint64(float64(totalMem)*paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) {
return 0, 0, fmt.Errorf("load segment failed, OOM if load, predictMemUsage = %v MB, totalMem = %v MB thresholdFactor = %f",
logicalMemUsageLimit := uint64(float64(totalMem) * paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat())
logicalDiskUsageLimit := uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()) * paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if predictLogicalMemUsage > logicalMemUsageLimit {
return 0, 0, fmt.Errorf("Logical memory usage checking for segment loading failed, predictLogicalMemUsage = %v MB, LogicalMemUsageLimit = %v MB, decrease the evictableMemoryCacheRatio (current: %v) if you want to load more segments",
logutil.ToMB(float64(predictLogicalMemUsage)),
logutil.ToMB(float64(totalMem)),
paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat())
logutil.ToMB(float64(logicalMemUsageLimit)),
paramtable.Get().QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat(),
)
}
if predictLogicalDiskUsage > uint64(float64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64())*paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {
return 0, 0, merr.WrapErrServiceDiskLimitExceeded(float32(predictLogicalDiskUsage), float32(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()), fmt.Sprintf("load segment failed, disk space is not enough, predictDiskUsage = %v MB, totalDisk = %v MB, thresholdFactor = %f",
if predictLogicalDiskUsage > logicalDiskUsageLimit {
return 0, 0, fmt.Errorf("Logical disk usage checking for segment loading failed, predictLogicalDiskUsage = %v MB, LogicalDiskUsageLimit = %v MB, decrease the evictableDiskCacheRatio (current: %v) if you want to load more segments",
logutil.ToMB(float64(predictLogicalDiskUsage)),
logutil.ToMB(float64(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()))),
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
logutil.ToMB(float64(logicalDiskUsageLimit)),
paramtable.Get().QueryNodeCfg.TieredEvictableDiskCacheRatio.GetAsFloat(),
)
}
return predictLogicalMemUsage - logicalMemUsage, predictLogicalDiskUsage - logicalDiskUsage, nil
@ -1570,7 +1585,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
mmapFieldCount := 0
for _, loadInfo := range segmentLoadInfos {
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
loadingUsage, err := getLoadingResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, maxFactor)
loadingUsage, err := estimateLoadingResourceUsageOfSegment(collection.Schema(), loadInfo, maxFactor)
if err != nil {
log.Warn(
"failed to estimate max resource usage of segment",
@ -1607,6 +1622,23 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
zap.Int("mmapFieldCount", mmapFieldCount),
)
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
// try to reserve loading resource from caching layer
if ok := C.TryReserveLoadingResourceWithTimeout(C.CResourceUsage{
memory_bytes: C.int64_t(predictMemUsage - memUsage),
disk_bytes: C.int64_t(predictDiskUsage - diskUsage),
}, 1000); !ok {
return 0, 0, fmt.Errorf("failed to reserve loading resource from caching layer, predictMemUsage = %v MB, predictDiskUsage = %v MB, memUsage = %v MB, diskUsage = %v MB, memoryThresholdFactor = %f, diskThresholdFactor = %f",
logutil.ToMB(float64(predictMemUsage)),
logutil.ToMB(float64(predictDiskUsage)),
logutil.ToMB(float64(memUsage)),
logutil.ToMB(float64(diskUsage)),
paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(),
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat(),
)
}
} else {
// fallback to original segment loading logic
if predictMemUsage > uint64(float64(totalMem)*paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) {
return 0, 0, fmt.Errorf("load segment failed, OOM if load, maxSegmentSize = %v MB, memUsage = %v MB, predictMemUsage = %v MB, totalMem = %v MB thresholdFactor = %f",
logutil.ToMB(float64(maxSegmentSize)),
@ -1623,17 +1655,21 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
logutil.ToMB(float64(uint64(paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()))),
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
}
}
err := checkSegmentGpuMemSize(predictGpuMemUsage, float32(paramtable.Get().GpuConfig.OverloadedMemoryThresholdPercentage.GetAsFloat()))
if err != nil {
return 0, 0, err
}
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
}
// this function is used to estimate the logical resource usage of a segment, which should only be used when tiered eviction is enabled
// the result is the final resource usage of the segment inevictable part plus the final usage of evictable part with cache ratio applied
func getLogicalResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) {
// TODO: the inevictable part is not correct, since we cannot know the final resource usage of interim index and default-value column before loading,
// current they are ignored, but we should consider them in the future
func estimateLogicalResourceUsageOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) {
var segmentInevictableMemorySize, segmentInevictableDiskSize uint64
var segmentEvictableMemorySize, segmentEvictableDiskSize uint64
@ -1648,7 +1684,7 @@ func getLogicalResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
}
ctx := context.Background()
// calculate data size
// PART 1: calculate logical resource usage of indexes
for _, fieldIndexInfo := range loadInfo.IndexInfos {
fieldID := fieldIndexInfo.GetFieldID()
if len(fieldIndexInfo.GetIndexFilePaths()) > 0 {
@ -1703,60 +1739,51 @@ func getLogicalResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
}
}
// PART 2: calculate logical resource usage of binlogs
for fieldID, fieldBinlog := range id2Binlogs {
binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog))
var isVectorType bool
var fieldSchema *schemapb.FieldSchema
if fieldID >= common.StartOfUserFieldID {
var err error
fieldSchema, err = schemaHelper.GetFieldFromID(fieldID)
// get field schema from fieldID
fieldSchema, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
log.Warn("failed to get field schema", zap.Int64("fieldID", fieldID), zap.String("name", schema.GetName()), zap.Error(err))
return nil, err
}
isVectorType = typeutil.IsVectorType(fieldSchema.GetDataType())
}
if isVectorType {
// TODO: add default-value column's resource usage to inevictable part
// TODO: add interim index's resource usage to inevictable part
if fieldSchema == nil {
// nil field schema means missing mapping, shall be "0" group for storage v2
segmentEvictableMemorySize += binlogSize
} else if typeutil.IsVectorType(fieldSchema.GetDataType()) {
mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool()
if mmapVectorField {
segmentEvictableDiskSize += binlogSize
} else {
segmentEvictableMemorySize += binlogSize
}
continue
}
// missing mapping, shall be "0" group for storage v2
if fieldSchema == nil {
segmentEvictableMemorySize += binlogSize
continue
}
mmapEnabled := isDataMmapEnable(fieldSchema)
if !mmapEnabled || common.IsSystemField(fieldSchema.GetFieldID()) {
// system field is not evictable
if common.IsSystemField(fieldSchema.GetFieldID()) {
} else if common.IsSystemField(fieldSchema.GetFieldID()) {
segmentInevictableMemorySize += binlogSize
if DoubleMemorySystemField(fieldSchema.GetFieldID()) {
segmentInevictableMemorySize += binlogSize
}
} else {
} else if !isDataMmapEnable(fieldSchema) {
segmentEvictableMemorySize += binlogSize
if DoubleMemoryDataType(fieldSchema.GetDataType()) {
segmentEvictableMemorySize += binlogSize
}
}
} else {
segmentEvictableDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
}
}
// get size of stats data, and stats data is inevictable
// PART 3: calculate logical resource usage of stats data
for _, fieldBinlog := range loadInfo.Statslogs {
segmentInevictableMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog))
}
// get size of delete data, and delete data is inevictable
// PART 4: calculate logical resource usage of delete data
for _, fieldBinlog := range loadInfo.Deltalogs {
// MemorySize of filedBinlog is the actual size in memory, so the expansionFactor
// should be 1, in most cases.
@ -1778,11 +1805,13 @@ func getLogicalResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
}, nil
}
// getLoadingResourceUsageEstimateOfSegment estimates the resource usage of the segment when loading
// - when tiered eviction is enabled, the result is the max resource usage of the segment inevictable part
// estimateLoadingResourceUsageOfSegment estimates the resource usage of the segment when loading,
// it will return two different results, depending on the value of tiered eviction parameter:
// - when tiered eviction is enabled, the result is the max resource usage of the segment that cannot be managed by caching layer,
// which should be a subset of the segment inevictable part
// - when tiered eviction is disabled, the result is the max resource usage of both the segment evictable and inevictable part
func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) {
var segmentMemorySize, segmentDiskSize uint64
func estimateLoadingResourceUsageOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) {
var segMemoryLoadingSize, segDiskLoadingSize uint64
var indexMemorySize uint64
var mmapFieldCount int
var fieldGpuMemorySize []uint64
@ -1798,7 +1827,7 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
}
ctx := context.Background()
// calculate data size
// PART 1: calculate size of indexes
for _, fieldIndexInfo := range loadInfo.IndexInfos {
fieldID := fieldIndexInfo.GetFieldID()
if len(fieldIndexInfo.GetIndexFilePaths()) > 0 {
@ -1806,6 +1835,7 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
if err != nil {
return nil, err
}
isVectorType := typeutil.IsVectorType(fieldSchema.GetDataType())
var estimateResult ResourceEstimate
@ -1818,14 +1848,15 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "failed to estimate resource usage of index, collection %d, segment %d, indexBuildID %d",
return nil, errors.Wrapf(err, "failed to estimate loading resource usage of index, collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),
loadInfo.GetSegmentID(),
fieldIndexInfo.GetBuildID())
}
if !multiplyFactor.TieredEvictionEnabled {
indexMemorySize += estimateResult.MaxMemoryCost
segmentDiskSize += estimateResult.MaxDiskCost
segDiskLoadingSize += estimateResult.MaxDiskCost
}
if vecindexmgr.GetVecIndexMgrInstance().IsGPUVecIndex(common.GetIndexType(fieldIndexInfo.IndexParams)) {
@ -1847,7 +1878,7 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.MetricTypeKey, fieldIndexInfo.IndexParams)
if err != nil {
return nil, errors.Wrapf(err, "failed to estimate resource usage of index, metric type not found, collection %d, segment %d, indexBuildID %d",
return nil, errors.Wrapf(err, "failed to estimate loading resource usage of index, metric type not found, collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),
loadInfo.GetSegmentID(),
fieldIndexInfo.GetBuildID())
@ -1859,74 +1890,86 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
}
}
// PART 2: calculate size of binlogs
for fieldID, fieldBinlog := range id2Binlogs {
binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog))
var isVectorType bool
var fieldSchema *schemapb.FieldSchema
if fieldID >= common.StartOfUserFieldID {
var err error
fieldSchema, err = schemaHelper.GetFieldFromID(fieldID)
// get field schema from fieldID
fieldSchema, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
log.Warn("failed to get field schema", zap.Int64("fieldID", fieldID), zap.String("name", schema.GetName()), zap.Error(err))
return nil, err
}
isVectorType = typeutil.IsVectorType(fieldSchema.GetDataType())
interimIndexEnable := multiplyFactor.EnableInterminSegmentIndex && !isGrowingMmapEnable() && SupportInterimIndexDataType(fieldSchema.GetDataType())
if interimIndexEnable {
segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor)
}
}
if isVectorType {
mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool()
if mmapVectorField {
if !multiplyFactor.TieredEvictionEnabled {
segmentDiskSize += binlogSize
}
} else {
if !multiplyFactor.TieredEvictionEnabled {
segmentMemorySize += binlogSize
}
}
continue
}
// missing mapping, shall be "0" group for storage v2
if fieldSchema == nil {
if !multiplyFactor.TieredEvictionEnabled {
segmentMemorySize += binlogSize
segMemoryLoadingSize += binlogSize
}
continue
}
if !multiplyFactor.TieredEvictionEnabled {
interimIndexEnable := multiplyFactor.EnableInterminSegmentIndex && !isGrowingMmapEnable() && SupportInterimIndexDataType(fieldSchema.GetDataType())
if interimIndexEnable {
segMemoryLoadingSize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor)
}
}
if typeutil.IsVectorType(fieldSchema.GetDataType()) {
mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool()
if mmapVectorField {
if !multiplyFactor.TieredEvictionEnabled {
segDiskLoadingSize += binlogSize
}
} else {
if !multiplyFactor.TieredEvictionEnabled {
segMemoryLoadingSize += binlogSize
}
}
continue
}
// system field does not support mmap, skip mmap check
if common.IsSystemField(fieldSchema.GetFieldID()) {
// system field isn't managed by the caching layer, so its size should always be included,
// regardless of the tiered eviction value
segMemoryLoadingSize += binlogSize
if DoubleMemorySystemField(fieldSchema.GetFieldID()) {
segMemoryLoadingSize += binlogSize
}
continue
}
mmapEnabled := isDataMmapEnable(fieldSchema)
if !mmapEnabled || common.IsSystemField(fieldSchema.GetFieldID()) {
// system field is not evictable, skip evictable size calculation
if !multiplyFactor.TieredEvictionEnabled || common.IsSystemField(fieldSchema.GetFieldID()) {
segmentMemorySize += binlogSize
if !mmapEnabled {
if !multiplyFactor.TieredEvictionEnabled {
segMemoryLoadingSize += binlogSize
if DoubleMemoryDataType(fieldSchema.GetDataType()) {
segMemoryLoadingSize += binlogSize
}
if DoubleMemorySystemField(fieldSchema.GetFieldID()) {
segmentMemorySize += binlogSize
} else if DoubleMemoryDataType(fieldSchema.GetDataType()) && !multiplyFactor.TieredEvictionEnabled {
segmentMemorySize += binlogSize
}
} else {
if !multiplyFactor.TieredEvictionEnabled {
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
segDiskLoadingSize += uint64(getBinlogDataDiskSize(fieldBinlog))
}
}
}
// get size of stats data, and stats data is inevictable
// PART 3: calculate size of stats data
// stats data isn't managed by the caching layer, so its size should always be included,
// regardless of the tiered eviction value
for _, fieldBinlog := range loadInfo.Statslogs {
segmentMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog))
segMemoryLoadingSize += uint64(getBinlogDataMemorySize(fieldBinlog))
}
// get size of delete data, and delete data is inevictable
// PART 4: calculate size of delete data
// delete data isn't managed by the caching layer, so its size should always be included,
// regardless of the tiered eviction value
for _, fieldBinlog := range loadInfo.Deltalogs {
// MemorySize of filedBinlog is the actual size in memory, so the expansionFactor
// should be 1, in most cases.
expansionFactor := float64(1)
// MemorySize of filedBinlog is the actual size in memory, but we should also consider
// the memcpy from golang to cpp side, so the expansionFactor is set to 2.
expansionFactor := float64(2)
memSize := getBinlogDataMemorySize(fieldBinlog)
// Note: If MemorySize == DiskSize, it means the segment comes from Milvus 2.3,
@ -1935,12 +1978,12 @@ func getLoadingResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema,
if memSize == getBinlogDataDiskSize(fieldBinlog) {
expansionFactor = multiplyFactor.deltaDataExpansionFactor
}
segmentMemorySize += uint64(float64(memSize) * expansionFactor)
segMemoryLoadingSize += uint64(float64(memSize) * expansionFactor)
}
return &ResourceUsage{
MemorySize: segmentMemorySize + indexMemorySize,
DiskSize: segmentDiskSize,
MemorySize: segMemoryLoadingSize + indexMemorySize,
DiskSize: segDiskLoadingSize,
MmapFieldCount: mmapFieldCount,
FieldGpuMemorySize: fieldGpuMemorySize,
}, nil

View File

@ -85,6 +85,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
initcore.InitRemoteChunkManager(paramtable.Get())
initcore.InitLocalChunkManager(suite.rootPath)
initcore.InitMmapManager(paramtable.Get(), 1)
initcore.InitTieredStorage(paramtable.Get())
// Data
suite.schema = mock_segcore.GenTestCollectionSchema("test", schemapb.DataType_Int64, false)
@ -655,7 +656,11 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
// TODO: this case should be fixed!
// currently the binlog size is all zero, this expected error is triggered by interim index usage calculation instead of binlog size
if !paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
suite.Error(err)
}
// Load growing
binlogs, statsLogs, err = mock_segcore.SaveBinLog(ctx,
@ -677,7 +682,9 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
if !paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
suite.Error(err)
}
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapDirPath.Key, "./mmap")
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
@ -689,7 +696,9 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
if !paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
suite.Error(err)
}
_, err = suite.loader.Load(ctx, suite.collectionID, SegmentTypeGrowing, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
PartitionID: suite.partitionID,
@ -699,7 +708,9 @@ func (suite *SegmentLoaderSuite) TestRunOutMemory() {
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
if !paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() {
suite.Error(err)
}
}
type SegmentLoaderDetailSuite struct {

View File

@ -49,6 +49,7 @@ func (suite *SegmentSuite) SetupTest() {
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)
initcore.InitMmapManager(paramtable.Get(), 1)
initcore.InitTieredStorage(paramtable.Get())
suite.collectionID = 100
suite.partitionID = 10

View File

@ -345,101 +345,10 @@ func (node *QueryNode) InitSegcore() error {
return err
}
// init tiered storage
scalarFieldCacheWarmupPolicy, err := segcore.ConvertCacheWarmupPolicy(paramtable.Get().QueryNodeCfg.TieredWarmupScalarField.GetValue())
err = initcore.InitTieredStorage(paramtable.Get())
if err != nil {
return err
}
vectorFieldCacheWarmupPolicy, err := segcore.ConvertCacheWarmupPolicy(paramtable.Get().QueryNodeCfg.TieredWarmupVectorField.GetValue())
if err != nil {
return err
}
deprecatedCacheWarmupPolicy := paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue()
if deprecatedCacheWarmupPolicy == "sync" {
log.Warn("queryNode.cache.warmup is being deprecated, use queryNode.segcore.tieredStorage.warmup.vectorField instead.")
log.Warn("for now, if queryNode.cache.warmup is set to sync, it will override queryNode.segcore.tieredStorage.warmup.vectorField to sync.")
log.Warn("otherwise, queryNode.cache.warmup will be ignored")
vectorFieldCacheWarmupPolicy = C.CacheWarmupPolicy_Sync
} else if deprecatedCacheWarmupPolicy == "async" {
log.Warn("queryNode.cache.warmup is being deprecated and ignored, use queryNode.segcore.tieredStorage.warmup.vectorField instead.")
}
scalarIndexCacheWarmupPolicy, err := segcore.ConvertCacheWarmupPolicy(paramtable.Get().QueryNodeCfg.TieredWarmupScalarIndex.GetValue())
if err != nil {
return err
}
vectorIndexCacheWarmupPolicy, err := segcore.ConvertCacheWarmupPolicy(paramtable.Get().QueryNodeCfg.TieredWarmupVectorIndex.GetValue())
if err != nil {
return err
}
osMemBytes := hardware.GetMemoryCount()
osDiskBytes := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsInt64()
memoryLowWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredMemoryLowWatermarkRatio.GetAsFloat()
memoryHighWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredMemoryHighWatermarkRatio.GetAsFloat()
memoryMaxRatio := paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()
diskLowWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredDiskLowWatermarkRatio.GetAsFloat()
diskHighWatermarkRatio := paramtable.Get().QueryNodeCfg.TieredDiskHighWatermarkRatio.GetAsFloat()
diskMaxRatio := paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()
if memoryLowWatermarkRatio > memoryHighWatermarkRatio {
return errors.New("memoryLowWatermarkRatio should not be greater than memoryHighWatermarkRatio")
}
if memoryHighWatermarkRatio > memoryMaxRatio {
return errors.New("memoryHighWatermarkRatio should not be greater than memoryMaxRatio")
}
if memoryMaxRatio >= 1 {
return errors.New("memoryMaxRatio should not be greater than 1")
}
if diskLowWatermarkRatio > diskHighWatermarkRatio {
return errors.New("diskLowWatermarkRatio should not be greater than diskHighWatermarkRatio")
}
if diskHighWatermarkRatio > diskMaxRatio {
return errors.New("diskHighWatermarkRatio should not be greater than diskMaxRatio")
}
if diskMaxRatio >= 1 {
return errors.New("diskMaxRatio should not be greater than 1")
}
memoryLowWatermarkBytes := C.int64_t(memoryLowWatermarkRatio * float64(osMemBytes))
memoryHighWatermarkBytes := C.int64_t(memoryHighWatermarkRatio * float64(osMemBytes))
memoryMaxBytes := C.int64_t(memoryMaxRatio * float64(osMemBytes))
diskLowWatermarkBytes := C.int64_t(diskLowWatermarkRatio * float64(osDiskBytes))
diskHighWatermarkBytes := C.int64_t(diskHighWatermarkRatio * float64(osDiskBytes))
diskMaxBytes := C.int64_t(diskMaxRatio * float64(osDiskBytes))
evictionEnabled := C.bool(paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool())
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && paramtable.Get().CommonCfg.EnablePosixMode.GetAsBool() {
panic("tiered storage eviction is not supported in POSIX mode, change config and restart")
}
cacheTouchWindowMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredCacheTouchWindowMs.GetAsInt64())
evictionIntervalMs := C.int64_t(paramtable.Get().QueryNodeCfg.TieredEvictionIntervalMs.GetAsInt64())
cacheCellUnaccessedSurvivalTime := C.int64_t(paramtable.Get().QueryNodeCfg.CacheCellUnaccessedSurvivalTime.GetAsInt64())
loadingMemoryFactor := C.float(paramtable.Get().QueryNodeCfg.TieredLoadingMemoryFactor.GetAsFloat())
overloadedMemoryThresholdPercentage := C.float(memoryMaxRatio)
maxDiskUsagePercentage := C.float(diskMaxRatio)
diskPath := C.CString(paramtable.Get().LocalStorageCfg.Path.GetValue())
defer C.free(unsafe.Pointer(diskPath))
C.ConfigureTieredStorage(C.CacheWarmupPolicy(scalarFieldCacheWarmupPolicy),
C.CacheWarmupPolicy(vectorFieldCacheWarmupPolicy),
C.CacheWarmupPolicy(scalarIndexCacheWarmupPolicy),
C.CacheWarmupPolicy(vectorIndexCacheWarmupPolicy),
memoryLowWatermarkBytes, memoryHighWatermarkBytes, memoryMaxBytes,
diskLowWatermarkBytes, diskHighWatermarkBytes, diskMaxBytes,
evictionEnabled, cacheTouchWindowMs, evictionIntervalMs, cacheCellUnaccessedSurvivalTime,
overloadedMemoryThresholdPercentage, loadingMemoryFactor, maxDiskUsagePercentage, diskPath)
tieredEvictableMemoryCacheRatio := paramtable.Get().QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat()
tieredEvictableDiskCacheRatio := paramtable.Get().QueryNodeCfg.TieredEvictableDiskCacheRatio.GetAsFloat()
log.Ctx(node.ctx).Info("tiered storage eviction cache ratio configured",
zap.Float64("tieredEvictableMemoryCacheRatio", tieredEvictableMemoryCacheRatio),
zap.Float64("tieredEvictableDiskCacheRatio", tieredEvictableDiskCacheRatio),
)
err = initcore.InitInterminIndexConfig(paramtable.Get())
if err != nil {

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/pathutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -274,6 +275,112 @@ func InitMmapManager(params *paramtable.ComponentParam, nodeID int64) error {
return HandleCStatus(&status, "InitMmapManager failed")
}
func ConvertCacheWarmupPolicy(policy string) (C.CacheWarmupPolicy, error) {
switch policy {
case "sync":
return C.CacheWarmupPolicy_Sync, nil
case "disable":
return C.CacheWarmupPolicy_Disable, nil
default:
return C.CacheWarmupPolicy_Disable, fmt.Errorf("invalid Tiered Storage cache warmup policy: %s", policy)
}
}
func InitTieredStorage(params *paramtable.ComponentParam) error {
// init tiered storage
scalarFieldCacheWarmupPolicy, err := ConvertCacheWarmupPolicy(params.QueryNodeCfg.TieredWarmupScalarField.GetValue())
if err != nil {
return err
}
vectorFieldCacheWarmupPolicy, err := ConvertCacheWarmupPolicy(params.QueryNodeCfg.TieredWarmupVectorField.GetValue())
if err != nil {
return err
}
deprecatedCacheWarmupPolicy := params.QueryNodeCfg.ChunkCacheWarmingUp.GetValue()
if deprecatedCacheWarmupPolicy == "sync" {
log.Warn("queryNode.cache.warmup is being deprecated, use queryNode.segcore.tieredStorage.warmup.vectorField instead.")
log.Warn("for now, if queryNode.cache.warmup is set to sync, it will override queryNode.segcore.tieredStorage.warmup.vectorField to sync.")
log.Warn("otherwise, queryNode.cache.warmup will be ignored")
vectorFieldCacheWarmupPolicy = C.CacheWarmupPolicy_Sync
} else if deprecatedCacheWarmupPolicy == "async" {
log.Warn("queryNode.cache.warmup is being deprecated and ignored, use queryNode.segcore.tieredStorage.warmup.vectorField instead.")
}
scalarIndexCacheWarmupPolicy, err := ConvertCacheWarmupPolicy(params.QueryNodeCfg.TieredWarmupScalarIndex.GetValue())
if err != nil {
return err
}
vectorIndexCacheWarmupPolicy, err := ConvertCacheWarmupPolicy(params.QueryNodeCfg.TieredWarmupVectorIndex.GetValue())
if err != nil {
return err
}
osMemBytes := hardware.GetMemoryCount()
osDiskBytes := params.QueryNodeCfg.DiskCapacityLimit.GetAsInt64()
memoryLowWatermarkRatio := params.QueryNodeCfg.TieredMemoryLowWatermarkRatio.GetAsFloat()
memoryHighWatermarkRatio := params.QueryNodeCfg.TieredMemoryHighWatermarkRatio.GetAsFloat()
memoryMaxRatio := params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()
diskLowWatermarkRatio := params.QueryNodeCfg.TieredDiskLowWatermarkRatio.GetAsFloat()
diskHighWatermarkRatio := params.QueryNodeCfg.TieredDiskHighWatermarkRatio.GetAsFloat()
diskMaxRatio := params.QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()
if memoryLowWatermarkRatio > memoryHighWatermarkRatio {
return errors.New("memoryLowWatermarkRatio should not be greater than memoryHighWatermarkRatio")
}
if memoryHighWatermarkRatio > memoryMaxRatio {
return errors.New("memoryHighWatermarkRatio should not be greater than memoryMaxRatio")
}
if memoryMaxRatio >= 1 {
return errors.New("memoryMaxRatio should not be greater than 1")
}
if diskLowWatermarkRatio > diskHighWatermarkRatio {
return errors.New("diskLowWatermarkRatio should not be greater than diskHighWatermarkRatio")
}
if diskHighWatermarkRatio > diskMaxRatio {
return errors.New("diskHighWatermarkRatio should not be greater than diskMaxRatio")
}
if diskMaxRatio >= 1 {
return errors.New("diskMaxRatio should not be greater than 1")
}
memoryLowWatermarkBytes := C.int64_t(memoryLowWatermarkRatio * float64(osMemBytes))
memoryHighWatermarkBytes := C.int64_t(memoryHighWatermarkRatio * float64(osMemBytes))
memoryMaxBytes := C.int64_t(memoryMaxRatio * float64(osMemBytes))
diskLowWatermarkBytes := C.int64_t(diskLowWatermarkRatio * float64(osDiskBytes))
diskHighWatermarkBytes := C.int64_t(diskHighWatermarkRatio * float64(osDiskBytes))
diskMaxBytes := C.int64_t(diskMaxRatio * float64(osDiskBytes))
evictionEnabled := C.bool(params.QueryNodeCfg.TieredEvictionEnabled.GetAsBool())
cacheTouchWindowMs := C.int64_t(params.QueryNodeCfg.TieredCacheTouchWindowMs.GetAsInt64())
evictionIntervalMs := C.int64_t(params.QueryNodeCfg.TieredEvictionIntervalMs.GetAsInt64())
cacheCellUnaccessedSurvivalTime := C.int64_t(params.QueryNodeCfg.CacheCellUnaccessedSurvivalTime.GetAsInt64())
loadingResourceFactor := C.float(params.QueryNodeCfg.TieredLoadingResourceFactor.GetAsFloat())
overloadedMemoryThresholdPercentage := C.float(memoryMaxRatio)
maxDiskUsagePercentage := C.float(diskMaxRatio)
diskPath := C.CString(params.LocalStorageCfg.Path.GetValue())
defer C.free(unsafe.Pointer(diskPath))
C.ConfigureTieredStorage(scalarFieldCacheWarmupPolicy,
vectorFieldCacheWarmupPolicy,
scalarIndexCacheWarmupPolicy,
vectorIndexCacheWarmupPolicy,
memoryLowWatermarkBytes, memoryHighWatermarkBytes, memoryMaxBytes,
diskLowWatermarkBytes, diskHighWatermarkBytes, diskMaxBytes,
evictionEnabled, cacheTouchWindowMs, evictionIntervalMs, cacheCellUnaccessedSurvivalTime,
overloadedMemoryThresholdPercentage, loadingResourceFactor, maxDiskUsagePercentage, diskPath)
tieredEvictableMemoryCacheRatio := params.QueryNodeCfg.TieredEvictableMemoryCacheRatio.GetAsFloat()
tieredEvictableDiskCacheRatio := params.QueryNodeCfg.TieredEvictableDiskCacheRatio.GetAsFloat()
log.Info("tiered storage eviction cache ratio configured",
zap.Float64("tieredEvictableMemoryCacheRatio", tieredEvictableMemoryCacheRatio),
zap.Float64("tieredEvictableDiskCacheRatio", tieredEvictableDiskCacheRatio),
)
return nil
}
func InitDiskFileWriterConfig(params *paramtable.ComponentParam) error {
mode := params.CommonCfg.DiskWriteMode.GetValue()
bufferSize := params.CommonCfg.DiskWriteBufferSizeKb.GetAsUint64()

View File

@ -65,6 +65,8 @@ func (suite *ReduceSuite) SetupTest() {
initcore.InitLocalChunkManager(localDataRootPath)
err := initcore.InitMmapManager(paramtable.Get(), 1)
suite.NoError(err)
err = initcore.InitTieredStorage(paramtable.Get())
suite.NoError(err)
ctx := context.Background()
msgLength := 100

View File

@ -295,14 +295,3 @@ func (s *cSegmentImpl) FinishLoad() error {
func (s *cSegmentImpl) Release() {
C.DeleteSegment(s.ptr)
}
func ConvertCacheWarmupPolicy(policy string) (C.CacheWarmupPolicy, error) {
switch policy {
case "sync":
return C.CacheWarmupPolicy_Sync, nil
case "disable":
return C.CacheWarmupPolicy_Disable, nil
default:
return C.CacheWarmupPolicy_Disable, fmt.Errorf("invalid Tiered Storage cache warmup policy: %s", policy)
}
}

View File

@ -25,6 +25,8 @@ func TestGrowingSegment(t *testing.T) {
initcore.InitLocalChunkManager(localDataRootPath)
err := initcore.InitMmapManager(paramtable.Get(), 1)
assert.NoError(t, err)
initcore.InitTieredStorage(paramtable.Get())
assert.NoError(t, err)
collectionID := int64(100)
segmentID := int64(100)

View File

@ -2920,7 +2920,7 @@ type queryNodeConfig struct {
TieredEvictableDiskCacheRatio ParamItem `refreshable:"false"`
TieredCacheTouchWindowMs ParamItem `refreshable:"false"`
TieredEvictionIntervalMs ParamItem `refreshable:"false"`
TieredLoadingMemoryFactor ParamItem `refreshable:"false"`
TieredLoadingResourceFactor ParamItem `refreshable:"false"`
CacheCellUnaccessedSurvivalTime ParamItem `refreshable:"false"`
KnowhereScoreConsistency ParamItem `refreshable:"false"`
@ -3273,21 +3273,21 @@ eviction is necessary and the amount of data to evict from memory/disk.
}
p.TieredEvictionIntervalMs.Init(base.mgr)
p.TieredLoadingMemoryFactor = ParamItem{
Key: "queryNode.segcore.tieredStorage.loadingMemoryFactor",
p.TieredLoadingResourceFactor = ParamItem{
Key: "queryNode.segcore.tieredStorage.loadingResourceFactor",
Version: "2.6.0",
DefaultValue: "3.5",
DefaultValue: "1.0",
Formatter: func(v string) string {
factor := getAsFloat(v)
if factor < 1.0 {
return "3.5"
return "1.0"
}
return fmt.Sprintf("%.2f", factor)
},
Doc: "Loading memory factor for estimating memory during loading.",
Doc: "Loading resource factor for estimating resource during loading.",
Export: false,
}
p.TieredLoadingMemoryFactor.Init(base.mgr)
p.TieredLoadingResourceFactor.Init(base.mgr)
p.CacheCellUnaccessedSurvivalTime = ParamItem{
Key: "queryNode.segcore.tieredStorage.cacheTtl",