mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
feat: vector field raw data to mmap by default (#41975)
issue: https://github.com/milvus-io/milvus/issues/41435 should address https://github.com/milvus-io/milvus/issues/41774 this PR also: * added caching layer memory overhead metric * re-enable TextMatch.GrowingLoadData test Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
parent
78010262f0
commit
2e3539319d
@ -464,7 +464,7 @@ queryNode:
|
|||||||
memoryLimit: 2147483648 # Deprecated: 2 GB, 2 * 1024 *1024 *1024
|
memoryLimit: 2147483648 # Deprecated: 2 GB, 2 * 1024 *1024 *1024
|
||||||
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
|
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
|
||||||
mmap:
|
mmap:
|
||||||
vectorField: false # Enable mmap for loading vector data
|
vectorField: true # Enable mmap for loading vector data
|
||||||
vectorIndex: false # Enable mmap for loading vector index
|
vectorIndex: false # Enable mmap for loading vector index
|
||||||
scalarField: false # Enable mmap for loading scalar data
|
scalarField: false # Enable mmap for loading scalar data
|
||||||
scalarIndex: false # Enable mmap for loading scalar index
|
scalarIndex: false # Enable mmap for loading scalar index
|
||||||
|
|||||||
@ -62,6 +62,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
|||||||
.Increment();
|
.Increment();
|
||||||
internal::cache_cell_count(translator_->meta()->storage_type)
|
internal::cache_cell_count(translator_->meta()->storage_type)
|
||||||
.Increment(translator_->num_cells());
|
.Increment(translator_->num_cells());
|
||||||
|
internal::cache_memory_overhead_bytes(translator_->meta()->storage_type)
|
||||||
|
.Increment(memory_overhead());
|
||||||
}
|
}
|
||||||
|
|
||||||
CacheSlot(const CacheSlot&) = delete;
|
CacheSlot(const CacheSlot&) = delete;
|
||||||
@ -207,6 +209,8 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
|||||||
.Decrement();
|
.Decrement();
|
||||||
internal::cache_cell_count(translator_->meta()->storage_type)
|
internal::cache_cell_count(translator_->meta()->storage_type)
|
||||||
.Decrement(translator_->num_cells());
|
.Decrement(translator_->num_cells());
|
||||||
|
internal::cache_memory_overhead_bytes(translator_->meta()->storage_type)
|
||||||
|
.Decrement(memory_overhead());
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -365,6 +369,12 @@ class CacheSlot final : public std::enable_shared_from_this<CacheSlot<CellT>> {
|
|||||||
std::unique_ptr<CellT> cell_{nullptr};
|
std::unique_ptr<CellT> cell_{nullptr};
|
||||||
std::chrono::steady_clock::time_point life_start_{};
|
std::chrono::steady_clock::time_point life_start_{};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
size_t
|
||||||
|
memory_overhead() const {
|
||||||
|
return sizeof(*this) + cells_.size() * sizeof(CacheCell);
|
||||||
|
}
|
||||||
|
|
||||||
const std::unique_ptr<Translator<CellT>> translator_;
|
const std::unique_ptr<Translator<CellT>> translator_;
|
||||||
// Each CacheCell's cid_t is its index in vector
|
// Each CacheCell's cid_t is its index in vector
|
||||||
// Once initialized, cells_ should never be resized.
|
// Once initialized, cells_ should never be resized.
|
||||||
|
|||||||
@ -79,7 +79,7 @@ class ListNode {
|
|||||||
bool
|
bool
|
||||||
manual_evict();
|
manual_evict();
|
||||||
|
|
||||||
// TODO(tiered storage 1): pin on ERROR should re-trigger loading.
|
// TODO(tiered storage 2): pin on ERROR should re-trigger loading.
|
||||||
// NOT_LOADED ---> LOADING ---> ERROR
|
// NOT_LOADED ---> LOADING ---> ERROR
|
||||||
// ^ |
|
// ^ |
|
||||||
// | v
|
// | v
|
||||||
|
|||||||
@ -222,7 +222,6 @@ DECLARE_PROMETHEUS_COUNTER(internal_cache_load_count_fail_memory);
|
|||||||
DECLARE_PROMETHEUS_COUNTER(internal_cache_load_count_fail_disk);
|
DECLARE_PROMETHEUS_COUNTER(internal_cache_load_count_fail_disk);
|
||||||
DECLARE_PROMETHEUS_COUNTER(internal_cache_load_count_fail_mixed);
|
DECLARE_PROMETHEUS_COUNTER(internal_cache_load_count_fail_mixed);
|
||||||
|
|
||||||
// TODO(tiered storage 1): not added
|
|
||||||
DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cache_memory_overhead_bytes);
|
DECLARE_PROMETHEUS_GAUGE_FAMILY(internal_cache_memory_overhead_bytes);
|
||||||
DECLARE_PROMETHEUS_GAUGE(internal_cache_memory_overhead_bytes_memory);
|
DECLARE_PROMETHEUS_GAUGE(internal_cache_memory_overhead_bytes_memory);
|
||||||
DECLARE_PROMETHEUS_GAUGE(internal_cache_memory_overhead_bytes_disk);
|
DECLARE_PROMETHEUS_GAUGE(internal_cache_memory_overhead_bytes_disk);
|
||||||
|
|||||||
@ -155,8 +155,6 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
|
|||||||
std::make_unique<ParallelDegreeSplitStrategy>(parallel_degree);
|
std::make_unique<ParallelDegreeSplitStrategy>(parallel_degree);
|
||||||
|
|
||||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
||||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
|
||||||
.GetArrowFileSystem();
|
|
||||||
|
|
||||||
auto load_future = pool.Submit([&]() {
|
auto load_future = pool.Submit([&]() {
|
||||||
return LoadWithStrategy(insert_files_,
|
return LoadWithStrategy(insert_files_,
|
||||||
|
|||||||
@ -163,7 +163,6 @@ TEST_P(TaskTest, CallExprEmpty) {
|
|||||||
EXPECT_EQ(num_rows, num_rows_);
|
EXPECT_EQ(num_rows, num_rows_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(tiered storage 1): this is slower due to the overhead of RawAt.
|
|
||||||
TEST_P(TaskTest, UnaryExpr) {
|
TEST_P(TaskTest, UnaryExpr) {
|
||||||
::milvus::proto::plan::GenericValue value;
|
::milvus::proto::plan::GenericValue value;
|
||||||
value.set_int64_val(-1);
|
value.set_int64_val(-1);
|
||||||
|
|||||||
@ -951,9 +951,8 @@ TEST(TextMatch, SealedJieBaNullable) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(tiered storage 1): this also fails on master branch.
|
|
||||||
// Test that growing segment loading flushed binlogs will build text match index.
|
// Test that growing segment loading flushed binlogs will build text match index.
|
||||||
TEST(TextMatch, DISABLED_GrowingLoadData) {
|
TEST(TextMatch, GrowingLoadData) {
|
||||||
int64_t N = 7;
|
int64_t N = 7;
|
||||||
auto schema = GenTestSchema({}, true);
|
auto schema = GenTestSchema({}, true);
|
||||||
schema->AddField(
|
schema->AddField(
|
||||||
|
|||||||
@ -3233,7 +3233,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
|
|||||||
p.MmapVectorField = ParamItem{
|
p.MmapVectorField = ParamItem{
|
||||||
Key: "queryNode.mmap.vectorField",
|
Key: "queryNode.mmap.vectorField",
|
||||||
Version: "2.4.7",
|
Version: "2.4.7",
|
||||||
DefaultValue: "false",
|
DefaultValue: "true",
|
||||||
Formatter: func(originValue string) string {
|
Formatter: func(originValue string) string {
|
||||||
if p.MmapEnabled.GetAsBool() {
|
if p.MmapEnabled.GetAsBool() {
|
||||||
return "true"
|
return "true"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user