enhance: support cache result cache for expr (#43923)

issue: #43878

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-08-26 10:55:52 +08:00 committed by GitHub
parent f1ce84996d
commit 8934c18792
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 769 additions and 120 deletions

View File

@ -556,6 +556,9 @@ queryNode:
levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"]
streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"]
forwardBatchSize: 4194304 # the batch size delegator uses for forwarding stream delete in loading procedure
exprCache:
enabled: false # enable expression result cache
capacityBytes: 268435456 # max capacity in bytes for expression result cache
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.

View File

@ -20,6 +20,7 @@
#include "common/Tracer.h"
#include "storage/ThreadPool.h"
#include "log/Log.h"
#include "exec/expression/ExprCache.h"
std::once_flag traceFlag;
std::once_flag cpuNumFlag;
@ -79,6 +80,17 @@ SetLogLevel(const char* level) {
milvus::SetLogLevel(level);
}
void
SetExprResCacheEnable(bool val) {
milvus::exec::ExprResCacheManager::SetEnabled(val);
}
void
SetExprResCacheCapacityBytes(int64_t bytes) {
milvus::exec::ExprResCacheManager::Instance().SetCapacityBytes(
static_cast<size_t>(bytes));
}
void
InitTrace(CTraceConfig* config) {
auto traceConfig = milvus::tracer::TraceConfig{config->exporter,

View File

@ -64,6 +64,13 @@ InitTrace(CTraceConfig* config);
void
SetTrace(CTraceConfig* config);
// Expr result cache
void
SetExprResCacheEnable(bool val);
void
SetExprResCacheCapacityBytes(int64_t bytes);
#ifdef __cplusplus
};
#endif

View File

@ -806,18 +806,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));

View File

@ -63,21 +63,24 @@ PhyExistsFilterExpr::EvalJsonExistsForIndex() {
case JsonCastType::DataType::DOUBLE: {
auto* json_index =
dynamic_cast<index::JsonInvertedIndex<double>*>(index);
cached_index_chunk_res_ = json_index->Exists().clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(json_index->Exists()));
break;
}
case JsonCastType::DataType::VARCHAR: {
auto* json_index =
dynamic_cast<index::JsonInvertedIndex<std::string>*>(index);
cached_index_chunk_res_ = json_index->Exists().clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(json_index->Exists()));
break;
}
case JsonCastType::DataType::BOOL: {
auto* json_index =
dynamic_cast<index::JsonInvertedIndex<bool>*>(index);
cached_index_chunk_res_ = json_index->Exists().clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(json_index->Exists()));
break;
}
@ -86,7 +89,8 @@ PhyExistsFilterExpr::EvalJsonExistsForIndex() {
dynamic_cast<index::JsonFlatIndex*>(index);
auto executor =
json_flat_index->create_executor<double>(pointer);
cached_index_chunk_res_ = executor->IsNotNull().clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(executor->IsNotNull()));
break;
}
@ -98,7 +102,7 @@ PhyExistsFilterExpr::EvalJsonExistsForIndex() {
}
TargetBitmap res;
res.append(
cached_index_chunk_res_, current_index_chunk_pos_, real_batch_size);
*cached_index_chunk_res_, current_index_chunk_pos_, real_batch_size);
current_index_chunk_pos_ += real_batch_size;
return std::make_shared<ColumnVector>(std::move(res),
TargetBitmap(real_batch_size, true));
@ -209,17 +213,16 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));

View File

@ -913,17 +913,19 @@ class SegmentExpr : public Expr {
i);
index_ptr = const_cast<Index*>(pw.get());
}
cached_index_chunk_res_ = std::move(func(index_ptr, values...));
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(func(index_ptr, values...)));
auto valid_result = index_ptr->IsNotNull();
cached_index_chunk_valid_res_ = std::move(valid_result);
cached_index_chunk_valid_res_ =
std::make_shared<TargetBitmap>(std::move(valid_result));
cached_index_chunk_id_ = i;
}
auto size = ProcessIndexOneChunk(result,
valid_result,
i,
cached_index_chunk_res_,
cached_index_chunk_valid_res_,
*cached_index_chunk_res_,
*cached_index_chunk_valid_res_,
processed_rows);
if (processed_rows + size >= batch_size_) {
@ -1181,12 +1183,16 @@ class SegmentExpr : public Expr {
TargetBitmap res = index_ptr->IsNotNull();
return res;
};
cached_index_chunk_valid_res_ = execute_sub_batch(index_ptr);
cached_index_chunk_valid_res_ = std::make_shared<TargetBitmap>(
std::move(execute_sub_batch(index_ptr)));
cached_index_chunk_id_ = i;
}
auto size = ProcessIndexOneChunkForValid(
valid_result, i, cached_index_chunk_valid_res_, processed_rows);
auto size =
ProcessIndexOneChunkForValid(valid_result,
i,
*cached_index_chunk_valid_res_,
processed_rows);
if (processed_rows + size >= batch_size_) {
current_index_chunk_ = i;
@ -1326,9 +1332,9 @@ class SegmentExpr : public Expr {
// Cache for index scan to avoid search index every batch
int64_t cached_index_chunk_id_{-1};
TargetBitmap cached_index_chunk_res_{};
std::shared_ptr<TargetBitmap> cached_index_chunk_res_{nullptr};
// Cache for chunk valid res.
TargetBitmap cached_index_chunk_valid_res_{};
std::shared_ptr<TargetBitmap> cached_index_chunk_valid_res_{nullptr};
// Cache for text match.
std::shared_ptr<TargetBitmap> cached_match_res_{nullptr};

View File

@ -0,0 +1,193 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "exec/expression/ExprCache.h"
namespace milvus {
namespace exec {
std::atomic<bool> ExprResCacheManager::enabled_{false};
ExprResCacheManager&
ExprResCacheManager::Instance() {
static ExprResCacheManager instance;
return instance;
}
void
ExprResCacheManager::SetEnabled(bool enabled) {
enabled_.store(enabled);
}
bool
ExprResCacheManager::IsEnabled() {
return enabled_.load();
}
void
ExprResCacheManager::SetCapacityBytes(size_t capacity_bytes) {
capacity_bytes_.store(capacity_bytes);
EnsureCapacity();
}
size_t
ExprResCacheManager::GetCapacityBytes() const {
return capacity_bytes_.load();
}
size_t
ExprResCacheManager::GetCurrentBytes() const {
return current_bytes_.load();
}
size_t
ExprResCacheManager::GetEntryCount() const {
return concurrent_map_.size();
}
bool
ExprResCacheManager::Get(const Key& key, Value& out_value) {
if (!IsEnabled()) {
return false;
}
auto it = concurrent_map_.find(key);
if (it == concurrent_map_.end()) {
return false;
}
out_value = it->second.value;
{
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
lru_list_.splice(lru_list_.begin(), lru_list_, it->second.lru_it);
}
LOG_DEBUG("get expr res cache, segment_id: {}, key: {}",
key.segment_id,
key.signature);
return true;
}
void
ExprResCacheManager::Put(const Key& key, const Value& value) {
if (!IsEnabled()) {
return;
}
size_t estimated_bytes = EstimateBytes(value);
auto stored_value = value;
stored_value.bytes = estimated_bytes;
auto it = concurrent_map_.find(key);
if (it != concurrent_map_.end()) {
auto old_bytes = it->second.value.bytes;
it->second.value = stored_value;
{
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
lru_list_.splice(lru_list_.begin(), lru_list_, it->second.lru_it);
current_bytes_.fetch_add(estimated_bytes - old_bytes);
}
} else {
ListIt list_it;
{
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
lru_list_.push_front(key);
list_it = lru_list_.begin();
current_bytes_.fetch_add(estimated_bytes);
}
Entry entry(stored_value, list_it);
concurrent_map_.emplace(key, std::move(entry));
}
if (current_bytes_.load() > capacity_bytes_.load()) {
EnsureCapacity();
}
LOG_DEBUG("put expr res cache, segment_id: {}, key: {}",
key.segment_id,
key.signature);
}
void
ExprResCacheManager::Clear() {
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
concurrent_map_.clear();
lru_list_.clear();
current_bytes_.store(0);
}
size_t
ExprResCacheManager::EstimateBytes(const Value& v) const {
size_t bytes = sizeof(Value);
if (v.result) {
bytes += (v.result->size() + 7) / 8;
}
if (v.valid_result) {
bytes += (v.valid_result->size() + 7) / 8;
}
return bytes;
}
void
ExprResCacheManager::EnsureCapacity() {
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
while (current_bytes_.load() > capacity_bytes_.load() &&
!lru_list_.empty()) {
const auto& back_key = lru_list_.back();
auto it = concurrent_map_.find(back_key);
if (it != concurrent_map_.end()) {
current_bytes_.fetch_sub(it->second.value.bytes);
concurrent_map_.unsafe_erase(it);
}
lru_list_.pop_back();
}
}
size_t
ExprResCacheManager::EraseSegment(int64_t segment_id) {
size_t erased = 0;
std::lock_guard<std::mutex> lru_lock(lru_mutex_);
for (auto it = lru_list_.begin(); it != lru_list_.end();) {
if (it->segment_id == segment_id) {
auto map_it = concurrent_map_.find(*it);
if (map_it != concurrent_map_.end()) {
current_bytes_.fetch_sub(map_it->second.value.bytes);
concurrent_map_.unsafe_erase(map_it);
}
it = lru_list_.erase(it);
++erased;
} else {
++it;
}
}
LOG_INFO("erase segment cache, segment_id: {}, erased: {} entries",
segment_id,
erased);
return erased;
}
void
ExprResCacheManager::Init(size_t capacity_bytes, bool enabled) {
SetEnabled(enabled);
Instance().SetCapacityBytes(capacity_bytes);
}
} // namespace exec
} // namespace milvus

View File

@ -0,0 +1,140 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <atomic>
#include <list>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <tbb/concurrent_unordered_map.h>
#include "common/Types.h"
#include "log/Log.h"
namespace milvus {
namespace exec {
// Process-level LRU cache for expression result bitsets.
class ExprResCacheManager {
public:
struct Key {
int64_t segment_id{0};
std::string signature; // expr signature including parameters
bool
operator==(const Key& other) const {
return segment_id == other.segment_id &&
signature == other.signature;
}
};
struct KeyHasher {
size_t
operator()(const Key& k) const noexcept {
std::hash<int64_t> h1;
std::hash<std::string> h2;
return (h1(k.segment_id) * 1315423911u) ^ h2(k.signature);
}
};
struct Value {
std::shared_ptr<TargetBitmap> result; // filter result bits
std::shared_ptr<TargetBitmap> valid_result; // valid bits
int64_t active_count{0}; // active count when cached
size_t bytes{0}; // approximate size in bytes
};
public:
static ExprResCacheManager&
Instance();
static void
SetEnabled(bool enabled);
static bool
IsEnabled();
static void
Init(size_t capacity_bytes, bool enabled);
void
SetCapacityBytes(size_t capacity_bytes);
size_t
GetCapacityBytes() const;
size_t
GetCurrentBytes() const;
size_t
GetEntryCount() const;
// Try to get cached value. If found, returns true and fills out_value.
bool
Get(const Key& key, Value& out_value);
// Insert or update cache entry. The provided value.result must be non-null.
void
Put(const Key& key, const Value& value);
void
Clear();
// Erase all cache entries of a specific segment. Returns number of erased entries.
size_t
EraseSegment(int64_t segment_id);
private:
ExprResCacheManager() = default;
size_t
EstimateBytes(const Value& v) const;
void
EnsureCapacity();
private:
static std::atomic<bool> enabled_;
std::atomic<size_t> capacity_bytes_{256ull * 1024ull *
1024ull}; // default 256MB
std::atomic<size_t> current_bytes_{0};
mutable std::mutex lru_mutex_;
std::list<Key> lru_list_;
using ListIt = std::list<Key>::iterator;
struct Entry {
Value value;
ListIt lru_it;
Entry() = default;
Entry(const Value& v, ListIt it) : value(v), lru_it(it) {
}
};
tbb::concurrent_unordered_map<Key, Entry, KeyHasher> concurrent_map_;
};
// Helper API: erase all cache for a given segment id, returns erased entry count
inline size_t
EraseSegmentCache(int64_t segment_id) {
if (!ExprResCacheManager::IsEnabled()) {
LOG_INFO("expr res cache is disabled, skip erase segment cache");
return 0;
}
return ExprResCacheManager::Instance().EraseSegment(segment_id);
}
} // namespace exec
} // namespace milvus

View File

@ -489,18 +489,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -708,18 +707,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -1020,18 +1018,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -1361,18 +1358,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -1590,18 +1586,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -1907,18 +1902,17 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));

View File

@ -693,19 +693,18 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));

View File

@ -19,6 +19,7 @@
#include "common/EasyAssert.h"
#include "common/Json.h"
#include "common/Types.h"
#include "exec/expression/ExprCache.h"
#include "common/type_c.h"
#include "log/Log.h"
@ -1473,17 +1474,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
};
bool is_growing = segment_->type() == SegmentType::Growing;
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(
std::move(index->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)));
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -1546,8 +1546,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForPk(EvalCtx& context) {
if (cached_index_chunk_id_ != 0) {
cached_index_chunk_id_ = 0;
cached_index_chunk_res_.resize(active_count_);
auto cache_view = cached_index_chunk_res_.view();
cached_index_chunk_res_ = std::make_shared<TargetBitmap>(active_count_);
auto cache_view = cached_index_chunk_res_->view();
auto op_type = expr_->op_type_;
PkType pk = value_arg_.GetValue<IndexInnerType>();
@ -1578,7 +1578,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForPk(EvalCtx& context) {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
*cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
@ -2005,6 +2005,25 @@ PhyUnaryRangeFilterExpr::ExecTextMatch() {
}
}
auto op_type = expr_->op_type_;
// Process-level LRU cache lookup by (segment_id, expr signature)
if (cached_match_res_ == nullptr &&
exec::ExprResCacheManager::IsEnabled() &&
segment_->type() == SegmentType::Sealed) {
exec::ExprResCacheManager::Key key{segment_->get_segment_id(),
this->ToString()};
exec::ExprResCacheManager::Value v;
if (exec::ExprResCacheManager::Instance().Get(key, v)) {
cached_match_res_ = v.result;
cached_index_chunk_valid_res_ = v.valid_result;
AssertInfo(cached_match_res_->size() == active_count_,
"internal error: expr res cache size {} not equal "
"expect active count {}",
cached_match_res_->size(),
active_count_);
}
}
auto func = [op_type, slop](Index* index,
const std::string& query) -> TargetBitmap {
if (op_type == proto::plan::OpType::TextMatch) {
@ -2028,13 +2047,26 @@ PhyUnaryRangeFilterExpr::ExecTextMatch() {
auto res = std::move(func(index, query));
auto valid_res = index->IsNotNull();
cached_match_res_ = std::make_shared<TargetBitmap>(std::move(res));
cached_index_chunk_valid_res_ = std::move(valid_res);
cached_index_chunk_valid_res_ =
std::make_shared<TargetBitmap>(std::move(valid_res));
if (cached_match_res_->size() < active_count_) {
// some entities are not visible in inverted index.
// only happend on growing segment.
TargetBitmap tail(active_count_ - cached_match_res_->size());
cached_match_res_->append(tail);
cached_index_chunk_valid_res_.append(tail);
cached_index_chunk_valid_res_->append(tail);
}
// Insert into process-level cache
if (exec::ExprResCacheManager::IsEnabled() &&
segment_->type() == SegmentType::Sealed) {
exec::ExprResCacheManager::Key key{segment_->get_segment_id(),
this->ToString()};
exec::ExprResCacheManager::Value v;
v.result = cached_match_res_;
v.valid_result = cached_index_chunk_valid_res_;
v.active_count = active_count_;
exec::ExprResCacheManager::Instance().Put(key, v);
}
}
@ -2042,7 +2074,7 @@ PhyUnaryRangeFilterExpr::ExecTextMatch() {
TargetBitmap valid_result;
result.append(
*cached_match_res_, current_data_global_pos_, real_batch_size);
valid_result.append(cached_index_chunk_valid_res_,
valid_result.append(*cached_index_chunk_valid_res_,
current_data_global_pos_,
real_batch_size);
MoveCursor();
@ -2100,17 +2132,17 @@ PhyUnaryRangeFilterExpr::ExecNgramMatch() {
if (!res_opt.has_value()) {
return std::nullopt;
}
auto valid_res = index->IsNotNull();
cached_ngram_match_res_ =
std::make_shared<TargetBitmap>(std::move(res_opt.value()));
cached_index_chunk_valid_res_ = std::move(valid_res);
cached_index_chunk_valid_res_ =
std::make_shared<TargetBitmap>(std::move(index->IsNotNull()));
}
TargetBitmap result;
TargetBitmap valid_result;
result.append(
*cached_ngram_match_res_, current_data_global_pos_, real_batch_size);
valid_result.append(cached_index_chunk_valid_res_,
valid_result.append(*cached_index_chunk_valid_res_,
current_data_global_pos_,
real_batch_size);
MoveCursor();

View File

@ -690,7 +690,7 @@ BitmapIndex<T>::IsNull() {
}
template <typename T>
const TargetBitmap
TargetBitmap
BitmapIndex<T>::IsNotNull() {
AssertInfo(is_built_, "index has not been built");
TargetBitmap res(total_num_rows_, true);

View File

@ -94,7 +94,7 @@ class BitmapIndex : public ScalarIndex<T> {
const TargetBitmap
IsNull() override;
const TargetBitmap
TargetBitmap
IsNotNull() override;
const TargetBitmap

View File

@ -94,7 +94,7 @@ class HybridScalarIndex : public ScalarIndex<T> {
return internal_index_->IsNull();
}
const TargetBitmap
TargetBitmap
IsNotNull() override {
return internal_index_->IsNotNull();
}

View File

@ -303,7 +303,7 @@ InvertedIndexTantivy<T>::IsNull() {
}
template <typename T>
const TargetBitmap
TargetBitmap
InvertedIndexTantivy<T>::IsNotNull() {
int64_t count = Count();
TargetBitmap bitset(count, true);

View File

@ -147,7 +147,7 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
const TargetBitmap
IsNull() override;
const TargetBitmap
TargetBitmap
IsNotNull() override;
const TargetBitmap

View File

@ -53,7 +53,7 @@ class JsonFlatIndexQueryExecutor : public InvertedIndexTantivy<T> {
return bitset;
}
const TargetBitmap
TargetBitmap
IsNotNull() override {
TargetBitmap bitset(this->Count());
this->wrapper_->json_exist_query(json_path_, &bitset);

View File

@ -70,7 +70,7 @@ JsonInvertedIndex<T>::build_index_for_json(
}
template <typename T>
const TargetBitmap
TargetBitmap
JsonInvertedIndex<T>::Exists() {
int64_t count = this->Count();
TargetBitmap bitset(count, true);
@ -178,4 +178,4 @@ template class JsonInvertedIndex<int64_t>;
template class JsonInvertedIndex<double>;
template class JsonInvertedIndex<std::string>;
} // namespace milvus::index
} // namespace milvus::index

View File

@ -167,7 +167,7 @@ class JsonInvertedIndex : public index::InvertedIndexTantivy<T> {
}
// Returns a bitmap indicating which rows have values that are indexed.
const TargetBitmap
TargetBitmap
Exists();
protected:

View File

@ -58,7 +58,7 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy<std::string> {
void
BuildWithFieldData(const std::vector<FieldDataPtr>& datas, bool nullable);
const TargetBitmap
TargetBitmap
FilterByPath(const std::string& path,
int32_t row,
bool is_growing,

View File

@ -89,7 +89,7 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
IsNull() = 0;
virtual const TargetBitmap
virtual TargetBitmap
IsNotNull() = 0;
virtual const TargetBitmap

View File

@ -372,7 +372,7 @@ ScalarIndexSort<T>::IsNull() {
}
template <typename T>
const TargetBitmap
TargetBitmap
ScalarIndexSort<T>::IsNotNull() {
AssertInfo(is_built_, "index has not been built");
TargetBitmap bitset(total_num_rows_, true);

View File

@ -91,7 +91,7 @@ class ScalarIndexSort : public ScalarIndex<T> {
const TargetBitmap
IsNull() override;
const TargetBitmap
TargetBitmap
IsNotNull() override;
const TargetBitmap

View File

@ -303,7 +303,7 @@ StringIndexMarisa::ResetNull(TargetBitmap& bitset) {
}
}
const TargetBitmap
TargetBitmap
StringIndexMarisa::IsNotNull() {
TargetBitmap bitset(str_ids_.size());
for (size_t i = 0; i < bitset.size(); i++) {

View File

@ -74,7 +74,7 @@ class StringIndexMarisa : public StringIndex {
const TargetBitmap
IsNull() override;
const TargetBitmap
TargetBitmap
IsNotNull() override;
const TargetBitmap

View File

@ -38,6 +38,7 @@
#include "segcore/ChunkedSegmentSealedImpl.h"
#include "mmap/Types.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "exec/expression/ExprCache.h"
////////////////////////////// common interfaces //////////////////////////////
CStatus
@ -648,4 +649,16 @@ FinishLoad(CSegmentInterface c_segment) {
} catch (std::exception& e) {
return milvus::FailureCStatus(milvus::UnexpectedError, e.what());
}
}
CStatus
ExprResCacheEraseSegment(int64_t segment_id) {
SCOPE_CGO_CALL_METRIC();
try {
milvus::exec::ExprResCacheManager::Instance().EraseSegment(segment_id);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(milvus::UnexpectedError, e.what());
}
}

View File

@ -167,6 +167,9 @@ CreateTextIndex(CSegmentInterface c_segment, int64_t field_id);
CStatus
FinishLoad(CSegmentInterface c_segment);
CStatus
ExprResCacheEraseSegment(int64_t segment_id);
#ifdef __cplusplus
}
#endif

View File

@ -105,6 +105,7 @@ set(MILVUS_TEST_FILES
test_chunked_column_group.cpp
test_group_chunk_translator.cpp
test_chunked_segment_storage_v2.cpp
test_expr_cache.cpp
test_thread_pool.cpp
test_json_flat_index.cpp
test_vector_array.cpp

View File

@ -0,0 +1,130 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include "exec/expression/ExprCache.h"
#include "common/Types.h"
using milvus::exec::ExprResCacheManager;
namespace {
milvus::TargetBitmap
MakeBits(size_t n, bool v = true) {
milvus::TargetBitmap b(n);
if (v)
b.set();
else
b.reset();
return b;
}
} // namespace
TEST(ExprResCacheManagerTest, PutGetBasic) {
auto& mgr = ExprResCacheManager::Instance();
ExprResCacheManager::SetEnabled(true);
mgr.Clear();
mgr.SetCapacityBytes(1ULL << 20); // 1MB
ExprResCacheManager::Key k{123, "expr:A"};
ExprResCacheManager::Value v;
v.result = std::make_shared<milvus::TargetBitmap>(MakeBits(128));
v.valid_result = std::make_shared<milvus::TargetBitmap>(MakeBits(128));
v.active_count = 128;
mgr.Put(k, v);
ExprResCacheManager::Value got;
ASSERT_TRUE(mgr.Get(k, got));
ASSERT_TRUE(got.result);
ASSERT_EQ(got.result->size(), 128);
ASSERT_TRUE(got.valid_result);
ASSERT_EQ(got.valid_result->size(), 128);
}
TEST(ExprResCacheManagerTest, LruEvictionByCapacity) {
auto& mgr = ExprResCacheManager::Instance();
ExprResCacheManager::SetEnabled(true);
mgr.Clear();
// roughly allow 2 entries
// (8192 / 8 * 2 + 32) * 2 = 4160
mgr.SetCapacityBytes(4300);
const size_t N = 8192; // bits
for (int i = 0; i < 3; ++i) {
ExprResCacheManager::Key k{1, "expr:" + std::to_string(i)};
ExprResCacheManager::Value v;
v.result = std::make_shared<milvus::TargetBitmap>(MakeBits(N));
v.valid_result = std::make_shared<milvus::TargetBitmap>(MakeBits(N));
mgr.Put(k, v);
}
// The first one should be evicted by LRU
ExprResCacheManager::Value out;
ASSERT_FALSE(mgr.Get({1, "expr:0"}, out));
ASSERT_TRUE(mgr.Get({1, "expr:1"}, out));
ASSERT_TRUE(mgr.Get({1, "expr:2"}, out));
}
TEST(ExprResCacheManagerTest, EraseSegment) {
auto& mgr = ExprResCacheManager::Instance();
ExprResCacheManager::SetEnabled(true);
mgr.Clear();
mgr.SetCapacityBytes(1ULL << 20);
ExprResCacheManager::Key k1{10, "sig1"};
ExprResCacheManager::Key k2{10, "sig2"};
ExprResCacheManager::Key k3{11, "sig3"};
ExprResCacheManager::Value v;
v.result = std::make_shared<milvus::TargetBitmap>(MakeBits(64));
v.valid_result = std::make_shared<milvus::TargetBitmap>(MakeBits(64));
mgr.Put(k1, v);
mgr.Put(k2, v);
mgr.Put(k3, v);
size_t erased = mgr.EraseSegment(10);
ASSERT_EQ(erased, 2);
ExprResCacheManager::Value out;
ASSERT_FALSE(mgr.Get(k1, out));
ASSERT_FALSE(mgr.Get(k2, out));
ASSERT_TRUE(mgr.Get(k3, out));
}
TEST(ExprResCacheManagerTest, EnableDisable) {
auto& mgr = ExprResCacheManager::Instance();
mgr.Clear();
mgr.SetCapacityBytes(1ULL << 20);
ExprResCacheManager::SetEnabled(false);
ExprResCacheManager::Key k{7, "x"};
ExprResCacheManager::Value v;
v.result = std::make_shared<milvus::TargetBitmap>(MakeBits(32));
v.valid_result = std::make_shared<milvus::TargetBitmap>(MakeBits(32));
mgr.Put(k, v);
ExprResCacheManager::Value out;
// When disabled, Get should not hit
ASSERT_FALSE(mgr.Get(k, out));
ExprResCacheManager::SetEnabled(true);
mgr.Put(k, v);
ASSERT_TRUE(mgr.Get(k, out));
}

View File

@ -23,6 +23,7 @@
#include "expr/ITypeExpr.h"
#include "segcore/segment_c.h"
#include "test_utils/storage_test_utils.h"
#include "exec/expression/ExprCache.h"
using namespace milvus;
using namespace milvus::query;
@ -1068,4 +1069,56 @@ TEST(TextMatch, ConcurrentReadWriteWithNull) {
writer.join();
reader.join();
}
}
TEST(TextMatch, ExprResCacheSealed) {
using milvus::exec::ExprResCacheManager;
auto& mgr = ExprResCacheManager::Instance();
ExprResCacheManager::SetEnabled(true);
mgr.Clear();
mgr.SetCapacityBytes(1ULL << 20);
auto schema = GenTestSchema();
std::vector<std::string> raw_str = {"football, basketball, pingpang",
"swimming, football"};
int64_t N = 2;
uint64_t seed = 19190504;
auto raw_data = DataGen(schema, N, seed);
auto str_col = raw_data.raw_->mutable_fields_data()
->at(1)
.mutable_scalars()
->mutable_string_data()
->mutable_data();
for (int64_t i = 0; i < N; i++) {
str_col->at(i) = raw_str[i];
}
auto seg = CreateSealedWithFieldDataLoaded(schema, raw_data);
seg->CreateTextIndex(FieldId(101));
ASSERT_EQ(mgr.GetEntryCount(), 0);
BitsetType final;
auto expr = GetMatchExpr(schema, "football", OpType::TextMatch);
final = ExecuteQueryExpr(expr, seg.get(), N, MAX_TIMESTAMP);
ASSERT_EQ(final.size(), N);
ASSERT_TRUE(final[0]);
ASSERT_TRUE(final[1]);
// Expect one cache entry inserted
ASSERT_EQ(mgr.GetEntryCount(), 1);
// Run again; should hit cache and not increase entries
final = ExecuteQueryExpr(expr, seg.get(), N, MAX_TIMESTAMP);
ASSERT_EQ(final.size(), N);
ASSERT_TRUE(final[0]);
ASSERT_TRUE(final[1]);
ASSERT_EQ(mgr.GetEntryCount(), 1);
// Cleanup
mgr.Clear();
ExprResCacheManager::SetEnabled(false);
}

View File

@ -23,6 +23,7 @@ package segments
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
#include "segcore/reduce_c.h"
#include "common/init_c.h"
*/
import "C"
@ -1319,7 +1320,10 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
return
}
usage := s.ResourceUsageEstimate()
if paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool() {
// erase expr-cache for this segment before deleting C segment
C.ExprResCacheEraseSegment(C.int64_t(s.ID()))
}
GetDynamicPool().Submit(func() (any, error) {
C.DeleteSegment(ptr)
@ -1327,6 +1331,7 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
}).Await()
// release reserved resource after the segment resource is really released.
usage := s.ResourceUsageEstimate()
s.manager.SubLogicalResource(usage)
log.Info("delete segment from memory")

View File

@ -312,6 +312,12 @@ func (node *QueryNode) InitSegcore() error {
cEnableConfigParamTypeCheck := C.bool(paramtable.Get().CommonCfg.EnableConfigParamTypeCheck.GetAsBool())
C.SetDefaultConfigParamTypeCheck(cEnableConfigParamTypeCheck)
cExprResCacheEnabled := C.bool(paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.GetAsBool())
C.SetExprResCacheEnable(cExprResCacheEnabled)
cExprResCacheCapacityBytes := C.int64_t(paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.GetAsInt64())
C.SetExprResCacheCapacityBytes(cExprResCacheCapacityBytes)
localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole)
initcore.InitLocalChunkManager(localDataRootPath)

View File

@ -392,6 +392,24 @@ func SetupCoreConfigChangelCallback() {
UpdateDefaultJSONKeyStatsCommitInterval(interval)
return nil
})
paramtable.Get().QueryNodeCfg.ExprResCacheEnabled.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
enable, err := strconv.ParseBool(newValue)
if err != nil {
return err
}
UpdateExprResCacheEnable(enable)
return nil
})
paramtable.Get().QueryNodeCfg.ExprResCacheCapacityBytes.RegisterCallback(func(ctx context.Context, key, oldValue, newValue string) error {
capacity, err := strconv.Atoi(newValue)
if err != nil {
return err
}
UpdateExprResCacheCapacityBytes(capacity)
return nil
})
})
}

View File

@ -64,6 +64,14 @@ func UpdateDefaultOptimizeExprEnable(enable bool) {
C.SetDefaultOptimizeExprEnable(C.bool(enable))
}
func UpdateExprResCacheEnable(enable bool) {
C.SetExprResCacheEnable(C.bool(enable))
}
func UpdateExprResCacheCapacityBytes(capacity int) {
C.SetExprResCacheCapacityBytes(C.int64_t(capacity))
}
func UpdateDefaultJSONKeyStatsCommitInterval(interval int) {
C.SetDefaultJSONKeyStatsCommitInterval(C.int64_t(interval))
}

View File

@ -2988,6 +2988,9 @@ type queryNodeConfig struct {
EnableWorkerSQCostMetrics ParamItem `refreshable:"true"`
ExprEvalBatchSize ParamItem `refreshable:"false"`
// expr cache
ExprResCacheEnabled ParamItem `refreshable:"false"`
ExprResCacheCapacityBytes ParamItem `refreshable:"false"`
// pipeline
CleanExcludeSegInterval ParamItem `refreshable:"false"`
@ -3975,6 +3978,27 @@ user-task-polling:
}
p.ExprEvalBatchSize.Init(base.mgr)
// expr cache
p.ExprResCacheEnabled = ParamItem{
Key: "queryNode.exprCache.enabled",
FallbackKeys: []string{"enable_expr_cache"},
Version: "2.6.0",
DefaultValue: "false",
Doc: "enable expression result cache",
Export: true,
}
p.ExprResCacheEnabled.Init(base.mgr)
p.ExprResCacheCapacityBytes = ParamItem{
Key: "queryNode.exprCache.capacityBytes",
FallbackKeys: []string{"max_expr_cache_size"},
Version: "2.6.0",
DefaultValue: "268435456", // 256MB
Doc: "max capacity in bytes for expression result cache",
Export: true,
}
p.ExprResCacheCapacityBytes.Init(base.mgr)
p.JSONKeyStatsCommitInterval = ParamItem{
Key: "queryNode.segcore.jsonKeyStatsCommitInterval",
Version: "2.5.0",