diff --git a/configs/milvus.yaml b/configs/milvus.yaml index ae5db437be..aebd730a26 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -425,6 +425,7 @@ queryNode: memExpansionRate: 1.15 # extra memory needed by building interim index buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num multipleChunkedEnable: true # Enable multiple chunked search + enableGeometryCache: false # Enable geometry cache for geometry data deleteDumpBatchSize: 10000 # Batch size for delete snapshot dump in segcore. knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic jsonKeyStatsCommitInterval: 200 # the commit interval for the JSON key Stats to commit diff --git a/internal/core/src/common/GeometryCache.h b/internal/core/src/common/GeometryCache.h index f5498f74a7..df4e74f821 100644 --- a/internal/core/src/common/GeometryCache.h +++ b/internal/core/src/common/GeometryCache.h @@ -114,7 +114,7 @@ class SimpleGeometryCacheManager { SimpleGeometryCacheManager() = default; SimpleGeometryCache& - GetCache(int64_t segment_id, FieldId field_id) { + GetOrCreateCache(int64_t segment_id, FieldId field_id) { std::lock_guard lock(mutex_); auto key = MakeCacheKey(segment_id, field_id); auto it = caches_.find(key); @@ -128,6 +128,17 @@ class SimpleGeometryCacheManager { return *cache_ptr; } + SimpleGeometryCache* + GetCache(int64_t segment_id, FieldId field_id) { + std::lock_guard lock(mutex_); + auto key = MakeCacheKey(segment_id, field_id); + auto it = caches_.find(key); + if (it != caches_.end()) { + return it->second.get(); + } + return nullptr; + } + void RemoveCache(GEOSContextHandle_t ctx, int64_t segment_id, FieldId field_id) { std::lock_guard lock(mutex_); @@ -184,26 +195,4 @@ class SimpleGeometryCacheManager { } // namespace exec -// Convenient global functions for direct access to geometry cache -inline const Geometry* -GetGeometryByOffset(int64_t segment_id, FieldId field_id, size_t offset) { - auto& cache = exec::SimpleGeometryCacheManager::Instance().GetCache( - segment_id, field_id); - return cache.GetByOffset(offset); -} - -inline void -RemoveGeometryCache(GEOSContextHandle_t ctx, - int64_t segment_id, - FieldId field_id) { - exec::SimpleGeometryCacheManager::Instance().RemoveCache( - ctx, segment_id, field_id); -} - -inline void -RemoveSegmentGeometryCaches(GEOSContextHandle_t ctx, int64_t segment_id) { - exec::SimpleGeometryCacheManager::Instance().RemoveSegmentCaches( - ctx, segment_id); -} - } // namespace milvus diff --git a/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp b/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp index 2b09b1a163..bce68ff2bc 100644 --- a/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp +++ b/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp @@ -20,44 +20,55 @@ namespace milvus { namespace exec { -#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \ - auto execute_sub_batch = [this](const _DataType* data, \ - const bool* valid_data, \ - const int32_t* offsets, \ - const int32_t* segment_offsets, \ - const int size, \ - TargetBitmapView res, \ - TargetBitmapView valid_res, \ - const Geometry& right_source) { \ - AssertInfo(segment_offsets != nullptr, \ - "segment_offsets should not be nullptr"); \ - /* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \ - auto& geometry_cache = \ - SimpleGeometryCacheManager::Instance().GetCache( \ - this->segment_->get_segment_id(), field_id_); \ - auto cache_lock = geometry_cache.AcquireReadLock(); \ - for (int i = 0; i < size; ++i) { \ - if (valid_data != nullptr && !valid_data[i]) { \ - res[i] = valid_res[i] = false; \ - continue; \ - } \ - auto absolute_offset = segment_offsets[i]; \ - auto cached_geometry = \ - geometry_cache.GetByOffsetUnsafe(absolute_offset); \ - AssertInfo(cached_geometry != nullptr, \ - "cached geometry is nullptr"); \ - res[i] = cached_geometry->method(right_source); \ - } \ - }; \ - int64_t processed_size = ProcessDataChunks<_DataType, true>( \ - execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ - AssertInfo(processed_size == real_batch_size, \ - "internal error: expr processed rows {} not equal " \ - "expect batch size {}", \ - processed_size, \ - real_batch_size); \ +#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \ + auto execute_sub_batch = [this](const _DataType* data, \ + const bool* valid_data, \ + const int32_t* offsets, \ + const int32_t* segment_offsets, \ + const int size, \ + TargetBitmapView res, \ + TargetBitmapView valid_res, \ + const Geometry& right_source) { \ + AssertInfo(segment_offsets != nullptr, \ + "segment_offsets should not be nullptr"); \ + auto* geometry_cache = \ + SimpleGeometryCacheManager::Instance().GetCache( \ + this->segment_->get_segment_id(), field_id_); \ + if (geometry_cache) { \ + auto cache_lock = geometry_cache->AcquireReadLock(); \ + for (int i = 0; i < size; ++i) { \ + if (valid_data != nullptr && !valid_data[i]) { \ + res[i] = valid_res[i] = false; \ + continue; \ + } \ + auto absolute_offset = segment_offsets[i]; \ + auto cached_geometry = \ + geometry_cache->GetByOffsetUnsafe(absolute_offset); \ + AssertInfo(cached_geometry != nullptr, \ + "cached geometry is nullptr"); \ + res[i] = cached_geometry->method(right_source); \ + } \ + } else { \ + GEOSContextHandle_t ctx_ = GEOS_init_r(); \ + for (int i = 0; i < size; ++i) { \ + if (valid_data != nullptr && !valid_data[i]) { \ + res[i] = valid_res[i] = false; \ + continue; \ + } \ + res[i] = Geometry(ctx_, data[i].data(), data[i].size()) \ + .method(right_source); \ + } \ + GEOS_finish_r(ctx_); \ + } \ + }; \ + int64_t processed_size = ProcessDataChunks<_DataType, true>( \ + execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ + AssertInfo(processed_size == real_batch_size, \ + "internal error: expr processed rows {} not equal " \ + "expect batch size {}", \ + processed_size, \ + real_batch_size); \ return res_vec; - // Specialized macro for distance-based operations (ST_DWITHIN) #define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON_DISTANCE(_DataType, method) \ auto execute_sub_batch = [this](const _DataType* data, \ @@ -70,21 +81,35 @@ namespace exec { const Geometry& right_source) { \ AssertInfo(segment_offsets != nullptr, \ "segment_offsets should not be nullptr"); \ - auto& geometry_cache = \ + auto* geometry_cache = \ SimpleGeometryCacheManager::Instance().GetCache( \ this->segment_->get_segment_id(), field_id_); \ - auto cache_lock = geometry_cache.AcquireReadLock(); \ - for (int i = 0; i < size; ++i) { \ - if (valid_data != nullptr && !valid_data[i]) { \ - res[i] = valid_res[i] = false; \ - continue; \ + if (geometry_cache) { \ + auto cache_lock = geometry_cache->AcquireReadLock(); \ + for (int i = 0; i < size; ++i) { \ + if (valid_data != nullptr && !valid_data[i]) { \ + res[i] = valid_res[i] = false; \ + continue; \ + } \ + auto absolute_offset = segment_offsets[i]; \ + auto cached_geometry = \ + geometry_cache->GetByOffsetUnsafe(absolute_offset); \ + AssertInfo(cached_geometry != nullptr, \ + "cached geometry is nullptr"); \ + res[i] = \ + cached_geometry->method(right_source, expr_->distance_); \ } \ - auto absolute_offset = segment_offsets[i]; \ - auto cached_geometry = \ - geometry_cache.GetByOffsetUnsafe(absolute_offset); \ - AssertInfo(cached_geometry != nullptr, \ - "cached geometry is nullptr"); \ - res[i] = cached_geometry->method(right_source, expr_->distance_); \ + } else { \ + GEOSContextHandle_t ctx_ = GEOS_init_r(); \ + for (int i = 0; i < size; ++i) { \ + if (valid_data != nullptr && !valid_data[i]) { \ + res[i] = valid_res[i] = false; \ + continue; \ + } \ + res[i] = Geometry(ctx_, data[i].data(), data[i].size()) \ + .method(right_source, expr_->distance_); \ + } \ + GEOS_finish_r(ctx_); \ } \ }; \ int64_t processed_size = ProcessDataChunks<_DataType, true>( \ @@ -358,21 +383,22 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() { }; // Lambda: Process sealed segment data using bulk_subscript with SimpleGeometryCache - auto process_sealed_data = - [&](const std::vector& hit_offsets) { - if (hit_offsets.empty()) - return; + auto process_sealed_data = [&](const std::vector& + hit_offsets) { + if (hit_offsets.empty()) + return; - // Get simple geometry cache for this segment+field - auto& geometry_cache = - SimpleGeometryCacheManager::Instance().GetCache( - segment_->get_segment_id(), field_id_); - auto cache_lock = geometry_cache.AcquireReadLock(); + // Get simple geometry cache for this segment+field + auto* geometry_cache = + SimpleGeometryCacheManager::Instance().GetCache( + segment_->get_segment_id(), field_id_); + if (geometry_cache) { + auto cache_lock = geometry_cache->AcquireReadLock(); for (size_t i = 0; i < hit_offsets.size(); ++i) { const auto pos = hit_offsets[i]; auto cached_geometry = - geometry_cache.GetByOffsetUnsafe(pos); + geometry_cache->GetByOffsetUnsafe(pos); // skip invalid geometry if (cached_geometry == nullptr) { continue; @@ -384,7 +410,35 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() { refined.set(pos); } } - }; + } else { + auto data_array = segment_->bulk_subscript( + field_id_, hit_offsets.data(), hit_offsets.size()); + + auto geometry_array = + static_cast( + &data_array->scalars().geometry_data()); + const auto& valid_data = data_array->valid_data(); + + GEOSContextHandle_t ctx = GEOS_init_r(); + for (size_t i = 0; i < hit_offsets.size(); ++i) { + const auto pos = hit_offsets[i]; + + // Skip invalid data + if (!valid_data.empty() && !valid_data[i]) { + continue; + } + + const auto& wkb_data = geometry_array->data(i); + Geometry left(ctx, wkb_data.data(), wkb_data.size()); + bool result = evaluate_geometry(left, query_geometry); + + if (result) { + refined.set(pos); + } + } + GEOS_finish_r(ctx); + } + }; auto hit_offsets = collect_hits(); process_sealed_data(hit_offsets); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 5fa73f6702..c8f6b6cf17 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -2238,8 +2238,8 @@ ChunkedSegmentSealedImpl::LoadGeometryCache( try { // Get geometry cache for this segment+field auto& geometry_cache = - milvus::exec::SimpleGeometryCacheManager::Instance().GetCache( - get_segment_id(), field_id); + milvus::exec::SimpleGeometryCacheManager::Instance() + .GetOrCreateCache(get_segment_id(), field_id); // Iterate through all chunks and collect WKB data auto num_chunks = var_column.num_chunks(); diff --git a/internal/core/src/segcore/SegcoreConfig.h b/internal/core/src/segcore/SegcoreConfig.h index 919ec82a4c..f3e24f551a 100644 --- a/internal/core/src/segcore/SegcoreConfig.h +++ b/internal/core/src/segcore/SegcoreConfig.h @@ -147,6 +147,16 @@ class SegcoreConfig { return refine_with_quant_flag_; } + void + set_enable_geometry_cache(bool enable_geometry_cache) { + enable_geometry_cache_ = enable_geometry_cache; + } + + bool + get_enable_geometry_cache() const { + return enable_geometry_cache_; + } + private: inline static const std::unordered_set valid_dense_vector_index_type = { @@ -165,6 +175,7 @@ class SegcoreConfig { inline static knowhere::RefineType refine_type_ = knowhere::RefineType::DATA_VIEW; inline static bool refine_with_quant_flag_ = false; + inline static bool enable_geometry_cache_ = false; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 3ad2a847a1..7d5dac243e 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -179,7 +179,8 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, } // Build geometry cache for GEOMETRY fields - if (field_meta.get_data_type() == DataType::GEOMETRY) { + if (field_meta.get_data_type() == DataType::GEOMETRY && + segcore_config_.get_enable_geometry_cache()) { BuildGeometryCacheForInsert( field_id, &insert_record_proto->fields_data(data_offset), @@ -324,7 +325,8 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { } // Build geometry cache for GEOMETRY fields - if (field_meta.get_data_type() == DataType::GEOMETRY) { + if (field_meta.get_data_type() == DataType::GEOMETRY && + segcore_config_.get_enable_geometry_cache()) { BuildGeometryCacheForLoad(field_id, field_data); } @@ -1028,8 +1030,8 @@ SegmentGrowingImpl::BuildGeometryCacheForInsert(FieldId field_id, try { // Get geometry cache for this segment+field auto& geometry_cache = - milvus::exec::SimpleGeometryCacheManager::Instance().GetCache( - get_segment_id(), field_id); + milvus::exec::SimpleGeometryCacheManager::Instance() + .GetOrCreateCache(get_segment_id(), field_id); // Process geometry data from DataArray const auto& geometry_data = data_array->scalars().geometry_data(); @@ -1071,8 +1073,8 @@ SegmentGrowingImpl::BuildGeometryCacheForLoad( try { // Get geometry cache for this segment+field auto& geometry_cache = - milvus::exec::SimpleGeometryCacheManager::Instance().GetCache( - get_segment_id(), field_id); + milvus::exec::SimpleGeometryCacheManager::Instance() + .GetOrCreateCache(get_segment_id(), field_id); // Process each field data chunk for (const auto& data : field_data) { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 4b43557529..2d6d7102b6 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -447,7 +447,9 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { field_data_size = var_column->DataByteSize(); // Construct GeometryCache for the entire field - LoadGeometryCache(field_id, *var_column); + if (segcore_config_.get_enable_geometry_cache()) { + LoadGeometryCache(field_id, *var_column); + } column = std::move(var_column); break; @@ -623,7 +625,9 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { var_column->Seal(std::move(indices)); // Construct GeometryCache for the entire field (mmap mode) - LoadGeometryCache(field_id, *var_column); + if (segcore_config_.get_enable_geometry_cache()) { + LoadGeometryCache(field_id, *var_column); + } column = std::move(var_column); break; @@ -2275,8 +2279,8 @@ SegmentSealedImpl::LoadGeometryCache( try { // Get geometry cache for this segment+field auto& geometry_cache = - milvus::exec::SimpleGeometryCacheManager::Instance().GetCache( - get_segment_id(), field_id); + milvus::exec::SimpleGeometryCacheManager::Instance() + .GetOrCreateCache(get_segment_id(), field_id); // Get all string views from the single chunk auto [string_views, valid_data] = var_column.StringViews(); diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index 65424cb16b..8d6dac62c2 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -40,6 +40,13 @@ SegcoreSetEnableInterminSegmentIndex(const bool value) { config.set_enable_interim_segment_index(value); } +extern "C" void +SegcoreSetEnableGeometryCache(const bool value) { + milvus::segcore::SegcoreConfig& config = + milvus::segcore::SegcoreConfig::default_config(); + config.set_enable_geometry_cache(value); +} + extern "C" void SegcoreSetNlist(const int64_t value) { milvus::segcore::SegcoreConfig& config = diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index 348a95236e..457cbf84c2 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -28,6 +28,9 @@ SegcoreSetChunkRows(const int64_t); void SegcoreSetEnableInterminSegmentIndex(const bool); +void +SegcoreSetEnableGeometryCache(const bool); + void SegcoreSetNlist(const int64_t); diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 34651f1d51..79be33b72c 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -351,6 +351,11 @@ func (node *QueryNode) InitSegcore() error { return err } + err = initcore.InitGeometryCache(paramtable.Get()) + if err != nil { + return err + } + initcore.InitTraceConfig(paramtable.Get()) C.InitExecExpressionFunctionFactory() return nil diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index 83497329e3..d7b4235ae4 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -266,6 +266,12 @@ func InitInterminIndexConfig(params *paramtable.ComponentParam) error { return HandleCStatus(&status, "InitInterminIndexConfig failed") } +func InitGeometryCache(params *paramtable.ComponentParam) error { + enableGeometryCache := C.bool(params.QueryNodeCfg.EnableGeometryCache.GetAsBool()) + C.SegcoreSetEnableGeometryCache(enableGeometryCache) + return nil +} + func CleanRemoteChunkManager() { C.CleanRemoteChunkManagerSingleton() } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index e3b1c82fcd..e868e58df3 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2790,6 +2790,7 @@ type queryNodeConfig struct { InterimIndexMemExpandRate ParamItem `refreshable:"false"` InterimIndexBuildParallelRate ParamItem `refreshable:"false"` MultipleChunkedEnable ParamItem `refreshable:"false"` + EnableGeometryCache ParamItem `refreshable:"false"` // delete snapshot dump DeleteDumpBatchSize ParamItem `refreshable:"false"` @@ -3075,6 +3076,15 @@ This defaults to true, indicating that Milvus creates temporary index for growin } p.MultipleChunkedEnable.Init(base.mgr) + p.EnableGeometryCache = ParamItem{ + Key: "queryNode.segcore.enableGeometryCache", + Version: "2.5.21", + DefaultValue: "false", + Doc: "Enable geometry cache for geometry data", + Export: true, + } + p.EnableGeometryCache.Init(base.mgr) + p.InterimIndexNProbe = ParamItem{ Key: "queryNode.segcore.interimIndex.nprobe", Version: "2.0.0",