mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: Make GeometryCache an optional configuration (#45192)
issue: #45187 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
ae03dee116
commit
ed8ba4a28c
@ -465,6 +465,7 @@ queryNode:
|
||||
memExpansionRate: 1.15 # extra memory needed by building interim index
|
||||
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
|
||||
multipleChunkedEnable: true # Deprecated. Enable multiple chunked search
|
||||
enableGeometryCache: false # Enable geometry cache for geometry data
|
||||
tieredStorage:
|
||||
warmup:
|
||||
# options: sync, disable.
|
||||
|
||||
@ -114,7 +114,7 @@ class SimpleGeometryCacheManager {
|
||||
SimpleGeometryCacheManager() = default;
|
||||
|
||||
SimpleGeometryCache&
|
||||
GetCache(int64_t segment_id, FieldId field_id) {
|
||||
GetOrCreateCache(int64_t segment_id, FieldId field_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto key = MakeCacheKey(segment_id, field_id);
|
||||
auto it = caches_.find(key);
|
||||
@ -128,6 +128,17 @@ class SimpleGeometryCacheManager {
|
||||
return *cache_ptr;
|
||||
}
|
||||
|
||||
SimpleGeometryCache*
|
||||
GetCache(int64_t segment_id, FieldId field_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto key = MakeCacheKey(segment_id, field_id);
|
||||
auto it = caches_.find(key);
|
||||
if (it != caches_.end()) {
|
||||
return it->second.get();
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
RemoveCache(GEOSContextHandle_t ctx, int64_t segment_id, FieldId field_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
@ -184,26 +195,4 @@ class SimpleGeometryCacheManager {
|
||||
|
||||
} // namespace exec
|
||||
|
||||
// Convenient global functions for direct access to geometry cache
|
||||
inline const Geometry*
|
||||
GetGeometryByOffset(int64_t segment_id, FieldId field_id, size_t offset) {
|
||||
auto& cache = exec::SimpleGeometryCacheManager::Instance().GetCache(
|
||||
segment_id, field_id);
|
||||
return cache.GetByOffset(offset);
|
||||
}
|
||||
|
||||
inline void
|
||||
RemoveGeometryCache(GEOSContextHandle_t ctx,
|
||||
int64_t segment_id,
|
||||
FieldId field_id) {
|
||||
exec::SimpleGeometryCacheManager::Instance().RemoveCache(
|
||||
ctx, segment_id, field_id);
|
||||
}
|
||||
|
||||
inline void
|
||||
RemoveSegmentGeometryCaches(GEOSContextHandle_t ctx, int64_t segment_id) {
|
||||
exec::SimpleGeometryCacheManager::Instance().RemoveSegmentCaches(
|
||||
ctx, segment_id);
|
||||
}
|
||||
|
||||
} // namespace milvus
|
||||
|
||||
@ -31,11 +31,11 @@ namespace exec {
|
||||
const Geometry& right_source) { \
|
||||
AssertInfo(segment_offsets != nullptr, \
|
||||
"segment_offsets should not be nullptr"); \
|
||||
/* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \
|
||||
auto& geometry_cache = \
|
||||
auto* geometry_cache = \
|
||||
SimpleGeometryCacheManager::Instance().GetCache( \
|
||||
this->segment_->get_segment_id(), field_id_); \
|
||||
auto cache_lock = geometry_cache.AcquireReadLock(); \
|
||||
if (geometry_cache) { \
|
||||
auto cache_lock = geometry_cache->AcquireReadLock(); \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
@ -43,11 +43,23 @@ namespace exec {
|
||||
} \
|
||||
auto absolute_offset = segment_offsets[i]; \
|
||||
auto cached_geometry = \
|
||||
geometry_cache.GetByOffsetUnsafe(absolute_offset); \
|
||||
geometry_cache->GetByOffsetUnsafe(absolute_offset); \
|
||||
AssertInfo(cached_geometry != nullptr, \
|
||||
"cached geometry is nullptr"); \
|
||||
res[i] = cached_geometry->method(right_source); \
|
||||
} \
|
||||
} else { \
|
||||
GEOSContextHandle_t ctx_ = GEOS_init_r(); \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
continue; \
|
||||
} \
|
||||
res[i] = Geometry(ctx_, data[i].data(), data[i].size()) \
|
||||
.method(right_source); \
|
||||
} \
|
||||
GEOS_finish_r(ctx_); \
|
||||
} \
|
||||
}; \
|
||||
int64_t processed_size = ProcessDataChunks<_DataType, true>( \
|
||||
execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \
|
||||
@ -57,7 +69,6 @@ namespace exec {
|
||||
processed_size, \
|
||||
real_batch_size); \
|
||||
return res_vec;
|
||||
|
||||
// Specialized macro for distance-based operations (ST_DWITHIN)
|
||||
#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON_DISTANCE(_DataType, method) \
|
||||
auto execute_sub_batch = [this](const _DataType* data, \
|
||||
@ -70,10 +81,11 @@ namespace exec {
|
||||
const Geometry& right_source) { \
|
||||
AssertInfo(segment_offsets != nullptr, \
|
||||
"segment_offsets should not be nullptr"); \
|
||||
auto& geometry_cache = \
|
||||
auto* geometry_cache = \
|
||||
SimpleGeometryCacheManager::Instance().GetCache( \
|
||||
this->segment_->get_segment_id(), field_id_); \
|
||||
auto cache_lock = geometry_cache.AcquireReadLock(); \
|
||||
if (geometry_cache) { \
|
||||
auto cache_lock = geometry_cache->AcquireReadLock(); \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
@ -81,10 +93,23 @@ namespace exec {
|
||||
} \
|
||||
auto absolute_offset = segment_offsets[i]; \
|
||||
auto cached_geometry = \
|
||||
geometry_cache.GetByOffsetUnsafe(absolute_offset); \
|
||||
geometry_cache->GetByOffsetUnsafe(absolute_offset); \
|
||||
AssertInfo(cached_geometry != nullptr, \
|
||||
"cached geometry is nullptr"); \
|
||||
res[i] = cached_geometry->method(right_source, expr_->distance_); \
|
||||
res[i] = \
|
||||
cached_geometry->method(right_source, expr_->distance_); \
|
||||
} \
|
||||
} else { \
|
||||
GEOSContextHandle_t ctx_ = GEOS_init_r(); \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
continue; \
|
||||
} \
|
||||
res[i] = Geometry(ctx_, data[i].data(), data[i].size()) \
|
||||
.method(right_source, expr_->distance_); \
|
||||
} \
|
||||
GEOS_finish_r(ctx_); \
|
||||
} \
|
||||
}; \
|
||||
int64_t processed_size = ProcessDataChunks<_DataType, true>( \
|
||||
@ -357,21 +382,22 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() {
|
||||
};
|
||||
|
||||
// Lambda: Process sealed segment data using bulk_subscript with SimpleGeometryCache
|
||||
auto process_sealed_data =
|
||||
[&](const std::vector<int64_t>& hit_offsets) {
|
||||
auto process_sealed_data = [&](const std::vector<int64_t>&
|
||||
hit_offsets) {
|
||||
if (hit_offsets.empty())
|
||||
return;
|
||||
|
||||
// Get simple geometry cache for this segment+field
|
||||
auto& geometry_cache =
|
||||
auto* geometry_cache =
|
||||
SimpleGeometryCacheManager::Instance().GetCache(
|
||||
segment_->get_segment_id(), field_id_);
|
||||
auto cache_lock = geometry_cache.AcquireReadLock();
|
||||
if (geometry_cache) {
|
||||
auto cache_lock = geometry_cache->AcquireReadLock();
|
||||
for (size_t i = 0; i < hit_offsets.size(); ++i) {
|
||||
const auto pos = hit_offsets[i];
|
||||
|
||||
auto cached_geometry =
|
||||
geometry_cache.GetByOffsetUnsafe(pos);
|
||||
geometry_cache->GetByOffsetUnsafe(pos);
|
||||
// skip invalid geometry
|
||||
if (cached_geometry == nullptr) {
|
||||
continue;
|
||||
@ -383,6 +409,35 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() {
|
||||
refined.set(pos);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
milvus::OpContext op_ctx;
|
||||
auto data_array = segment_->bulk_subscript(
|
||||
&op_ctx, field_id_, hit_offsets.data(), hit_offsets.size());
|
||||
|
||||
auto geometry_array =
|
||||
static_cast<const milvus::proto::schema::GeometryArray*>(
|
||||
&data_array->scalars().geometry_data());
|
||||
const auto& valid_data = data_array->valid_data();
|
||||
|
||||
GEOSContextHandle_t ctx = GEOS_init_r();
|
||||
for (size_t i = 0; i < hit_offsets.size(); ++i) {
|
||||
const auto pos = hit_offsets[i];
|
||||
|
||||
// Skip invalid data
|
||||
if (!valid_data.empty() && !valid_data[i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto& wkb_data = geometry_array->data(i);
|
||||
Geometry left(ctx, wkb_data.data(), wkb_data.size());
|
||||
bool result = evaluate_geometry(left, query_geometry);
|
||||
|
||||
if (result) {
|
||||
refined.set(pos);
|
||||
}
|
||||
}
|
||||
GEOS_finish_r(ctx);
|
||||
}
|
||||
};
|
||||
|
||||
auto hit_offsets = collect_hits();
|
||||
|
||||
@ -2654,7 +2654,8 @@ ChunkedSegmentSealedImpl::load_field_data_common(
|
||||
column->ManualEvictCache();
|
||||
}
|
||||
}
|
||||
if (data_type == DataType::GEOMETRY) {
|
||||
if (data_type == DataType::GEOMETRY &&
|
||||
segcore_config_.get_enable_geometry_cache()) {
|
||||
// Construct GeometryCache for the entire field
|
||||
LoadGeometryCache(field_id, column);
|
||||
}
|
||||
@ -2787,8 +2788,8 @@ ChunkedSegmentSealedImpl::LoadGeometryCache(
|
||||
try {
|
||||
// Get geometry cache for this segment+field
|
||||
auto& geometry_cache =
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
|
||||
get_segment_id(), field_id);
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance()
|
||||
.GetOrCreateCache(get_segment_id(), field_id);
|
||||
|
||||
// Iterate through all chunks and collect WKB data
|
||||
auto num_chunks = column->num_chunks();
|
||||
|
||||
@ -147,6 +147,16 @@ class SegcoreConfig {
|
||||
return refine_with_quant_flag_;
|
||||
}
|
||||
|
||||
void
|
||||
set_enable_geometry_cache(bool enable_geometry_cache) {
|
||||
enable_geometry_cache_ = enable_geometry_cache;
|
||||
}
|
||||
|
||||
bool
|
||||
get_enable_geometry_cache() const {
|
||||
return enable_geometry_cache_;
|
||||
}
|
||||
|
||||
private:
|
||||
inline static const std::unordered_set<std::string>
|
||||
valid_dense_vector_index_type = {
|
||||
@ -165,6 +175,7 @@ class SegcoreConfig {
|
||||
inline static knowhere::RefineType refine_type_ =
|
||||
knowhere::RefineType::DATA_VIEW;
|
||||
inline static bool refine_with_quant_flag_ = false;
|
||||
inline static bool enable_geometry_cache_ = false;
|
||||
};
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -226,7 +226,8 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
|
||||
}
|
||||
|
||||
// Build geometry cache for GEOMETRY fields
|
||||
if (field_meta.get_data_type() == DataType::GEOMETRY) {
|
||||
if (field_meta.get_data_type() == DataType::GEOMETRY &&
|
||||
segcore_config_.get_enable_geometry_cache()) {
|
||||
BuildGeometryCacheForInsert(
|
||||
field_id,
|
||||
&insert_record_proto->fields_data(data_offset),
|
||||
@ -527,7 +528,8 @@ SegmentGrowingImpl::load_column_group_data_internal(
|
||||
num_rows);
|
||||
// Build geometry cache for GEOMETRY fields
|
||||
if (schema_->operator[](field_id).get_data_type() ==
|
||||
DataType::GEOMETRY) {
|
||||
DataType::GEOMETRY &&
|
||||
segcore_config_.get_enable_geometry_cache()) {
|
||||
BuildGeometryCacheForLoad(field_id, field_data);
|
||||
}
|
||||
}
|
||||
@ -1379,8 +1381,8 @@ SegmentGrowingImpl::BuildGeometryCacheForInsert(FieldId field_id,
|
||||
try {
|
||||
// Get geometry cache for this segment+field
|
||||
auto& geometry_cache =
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
|
||||
get_segment_id(), field_id);
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance()
|
||||
.GetOrCreateCache(get_segment_id(), field_id);
|
||||
|
||||
// Process geometry data from DataArray
|
||||
const auto& geometry_data = data_array->scalars().geometry_data();
|
||||
@ -1423,8 +1425,8 @@ SegmentGrowingImpl::BuildGeometryCacheForLoad(
|
||||
try {
|
||||
// Get geometry cache for this segment+field
|
||||
auto& geometry_cache =
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
|
||||
get_segment_id(), field_id);
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance()
|
||||
.GetOrCreateCache(get_segment_id(), field_id);
|
||||
|
||||
// Process each field data chunk
|
||||
for (const auto& data : field_data) {
|
||||
|
||||
@ -41,6 +41,13 @@ SegcoreSetEnableInterminSegmentIndex(const bool value) {
|
||||
config.set_enable_interim_segment_index(value);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
SegcoreSetEnableGeometryCache(const bool value) {
|
||||
milvus::segcore::SegcoreConfig& config =
|
||||
milvus::segcore::SegcoreConfig::default_config();
|
||||
config.set_enable_geometry_cache(value);
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
SegcoreSetNlist(const int64_t value) {
|
||||
milvus::segcore::SegcoreConfig& config =
|
||||
|
||||
@ -30,6 +30,9 @@ SegcoreSetChunkRows(const int64_t);
|
||||
void
|
||||
SegcoreSetEnableInterminSegmentIndex(const bool);
|
||||
|
||||
void
|
||||
SegcoreSetEnableGeometryCache(const bool);
|
||||
|
||||
void
|
||||
SegcoreSetNlist(const int64_t);
|
||||
|
||||
|
||||
@ -562,6 +562,12 @@ func InitInterminIndexConfig(params *paramtable.ComponentParam) error {
|
||||
return HandleCStatus(&status, "InitInterminIndexConfig failed")
|
||||
}
|
||||
|
||||
func InitGeometryCache(params *paramtable.ComponentParam) error {
|
||||
enableGeometryCache := C.bool(params.QueryNodeCfg.EnableGeometryCache.GetAsBool())
|
||||
C.SegcoreSetEnableGeometryCache(enableGeometryCache)
|
||||
return nil
|
||||
}
|
||||
|
||||
func CleanRemoteChunkManager() {
|
||||
C.CleanRemoteChunkManagerSingleton()
|
||||
}
|
||||
|
||||
@ -174,6 +174,11 @@ func doInitQueryNodeOnce(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = InitGeometryCache(paramtable.Get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
InitTraceConfig(paramtable.Get())
|
||||
C.InitExecExpressionFunctionFactory()
|
||||
|
||||
|
||||
@ -3052,6 +3052,7 @@ type queryNodeConfig struct {
|
||||
InterimIndexMemExpandRate ParamItem `refreshable:"false"`
|
||||
InterimIndexBuildParallelRate ParamItem `refreshable:"false"`
|
||||
MultipleChunkedEnable ParamItem `refreshable:"false"` // Deprecated
|
||||
EnableGeometryCache ParamItem `refreshable:"false"`
|
||||
|
||||
// TODO(tiered storage 2) this should be refreshable?
|
||||
TieredWarmupScalarField ParamItem `refreshable:"false"`
|
||||
@ -3627,6 +3628,15 @@ This defaults to true, indicating that Milvus creates temporary index for growin
|
||||
}
|
||||
p.MultipleChunkedEnable.Init(base.mgr)
|
||||
|
||||
p.EnableGeometryCache = ParamItem{
|
||||
Key: "queryNode.segcore.enableGeometryCache",
|
||||
Version: "2.6.5",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable geometry cache for geometry data",
|
||||
Export: true,
|
||||
}
|
||||
p.EnableGeometryCache.Init(base.mgr)
|
||||
|
||||
p.InterimIndexNProbe = ParamItem{
|
||||
Key: "queryNode.segcore.interimIndex.nprobe",
|
||||
Version: "2.0.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user