diff --git a/go.mod b/go.mod index 4ade37d262..a28616bd4c 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/remeh/sizedwaitgroup v1.0.0 github.com/shirou/gopsutil/v4 v4.24.10 github.com/tidwall/gjson v1.17.1 - github.com/twpayne/go-geom v1.5.7 + github.com/twpayne/go-geom v1.6.1 github.com/valyala/fastjson v1.6.4 github.com/zeebo/xxh3 v1.0.2 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 diff --git a/go.sum b/go.sum index be32808960..32fc647436 100644 --- a/go.sum +++ b/go.sum @@ -931,8 +931,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= -github.com/twpayne/go-geom v1.5.7 h1:7fdceDUr03/MP7rAKOaTV6x9njMiQdxB/D0PDzMTCDc= -github.com/twpayne/go-geom v1.5.7/go.mod h1:y4fTAQtLedXW8eG2Yo4tYrIGN1yIwwKkmA+K3iSHKBA= +github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4= +github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= diff --git a/internal/core/src/common/GeometryCache.h b/internal/core/src/common/GeometryCache.h new file mode 100644 index 0000000000..44510bc6fe --- /dev/null +++ b/internal/core/src/common/GeometryCache.h @@ -0,0 +1,171 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/Geometry.h" +#include "common/Types.h" + +namespace milvus { +namespace exec { + +// Custom hash function for segment id + field id pair +struct SegmentFieldHash { + std::size_t + operator()(const std::pair& p) const { + return std::hash{}(p.first) ^ + (std::hash{}(p.second) << 1); + } +}; + +// Simple WKB-based Geometry cache for avoiding repeated WKB->Geometry conversions +class SimpleGeometryCache { + public: + // Get or create Geometry from WKB data + std::shared_ptr + GetOrCreate(const std::string_view& wkb_data) { + // Use WKB data content as key (could be optimized with hash later) + std::string key(wkb_data); + + // Try read-only access first (most common case) + { + std::shared_lock lock(mutex_); + auto it = cache_.find(key); + if (it != cache_.end()) { + return it->second; + } + } + + // Cache miss: create new Geometry with write lock + std::lock_guard lock(mutex_); + + // Double-check after acquiring write lock + auto it = cache_.find(key); + if (it != cache_.end()) { + return it->second; + } + + // Construct new Geometry + try { + auto geometry = std::make_shared(wkb_data.data(), + wkb_data.size()); + cache_.emplace(key, geometry); + return geometry; + } catch (...) { + // Return nullptr on construction failure + return nullptr; + } + } + + // Clear all cached geometries + void + Clear() { + std::lock_guard lock(mutex_); + cache_.clear(); + } + + // Get cache statistics + size_t + Size() const { + std::shared_lock lock(mutex_); + return cache_.size(); + } + + private: + mutable std::shared_mutex mutex_; + std::unordered_map> cache_; +}; + +// Global cache instance per segment+field +class SimpleGeometryCacheManager { + public: + static SimpleGeometryCacheManager& + Instance() { + static SimpleGeometryCacheManager instance; + return instance; + } + + SimpleGeometryCacheManager() = default; + + SimpleGeometryCache& + GetCache(int64_t segment_id, FieldId field_id) { + std::lock_guard lock(mutex_); + auto key = std::make_pair(segment_id, field_id.get()); + auto it = caches_.find(key); + if (it != caches_.end()) { + return *(it->second); + } + + auto cache = std::make_unique(); + auto* cache_ptr = cache.get(); + caches_.emplace(key, std::move(cache)); + return *cache_ptr; + } + + void + RemoveCache(int64_t segment_id, FieldId field_id) { + std::lock_guard lock(mutex_); + auto key = std::make_pair(segment_id, field_id.get()); + caches_.erase(key); + } + + // Remove all caches for a segment (useful when segment is destroyed) + void + RemoveSegmentCaches(int64_t segment_id) { + std::lock_guard lock(mutex_); + auto it = caches_.begin(); + while (it != caches_.end()) { + if (it->first.first == segment_id) { + it = caches_.erase(it); + } else { + ++it; + } + } + } + + // Get cache statistics for monitoring + struct CacheStats { + size_t total_caches = 0; + size_t total_geometries = 0; + }; + + CacheStats + GetStats() const { + std::lock_guard lock(mutex_); + CacheStats stats; + stats.total_caches = caches_.size(); + for (const auto& [key, cache] : caches_) { + stats.total_geometries += cache->Size(); + } + return stats; + } + + private: + SimpleGeometryCacheManager(const SimpleGeometryCacheManager&) = delete; + SimpleGeometryCacheManager& + operator=(const SimpleGeometryCacheManager&) = delete; + + mutable std::mutex mutex_; + std::unordered_map, + std::unique_ptr, + SegmentFieldHash> + caches_; +}; + +} // namespace exec +} // namespace milvus diff --git a/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp b/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp index f52b001c1e..6bd44ddd22 100644 --- a/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp +++ b/internal/core/src/exec/expression/GISFunctionFilterExpr.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "GISFunctionFilterExpr.h" +#include #include "common/EasyAssert.h" #include "common/Geometry.h" #include "common/Types.h" @@ -20,30 +21,46 @@ namespace milvus { namespace exec { -#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \ - auto execute_sub_batch = [](const _DataType* data, \ - const bool* valid_data, \ - const int32_t* offsets, \ - const int size, \ - TargetBitmapView res, \ - TargetBitmapView valid_res, \ - const Geometry& right_source) { \ - for (int i = 0; i < size; ++i) { \ - if (valid_data != nullptr && !valid_data[i]) { \ - res[i] = valid_res[i] = false; \ - continue; \ - } \ - res[i] = \ - Geometry(data[i].data(), data[i].size()).method(right_source); \ - } \ - }; \ - int64_t processed_size = ProcessDataChunks<_DataType>( \ - execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ - AssertInfo(processed_size == real_batch_size, \ - "internal error: expr processed rows {} not equal " \ - "expect batch size {}", \ - processed_size, \ - real_batch_size); \ +#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \ + auto execute_sub_batch = [this](const _DataType* data, \ + const bool* valid_data, \ + const int32_t* offsets, \ + const int size, \ + TargetBitmapView res, \ + TargetBitmapView valid_res, \ + const Geometry& right_source) { \ + /* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \ + auto& geometry_cache = \ + SimpleGeometryCacheManager::Instance().GetCache( \ + this->segment_->get_segment_id(), field_id_); \ + for (int i = 0; i < size; ++i) { \ + if (valid_data != nullptr && !valid_data[i]) { \ + res[i] = valid_res[i] = false; \ + continue; \ + } \ + /* Create string_view from WKB data for cache lookup */ \ + std::string_view wkb_view(data[i].data(), data[i].size()); \ + auto cached_geometry = geometry_cache.GetOrCreate(wkb_view); \ + \ + bool result = false; \ + if (cached_geometry != nullptr) { \ + /* Use cached geometry for operation */ \ + result = cached_geometry->method(right_source); \ + } else { \ + /* Fallback: construct temporary geometry (cache construction failed) */ \ + Geometry tmp(data[i].data(), data[i].size()); \ + result = tmp.method(right_source); \ + } \ + res[i] = result; \ + } \ + }; \ + int64_t processed_size = ProcessDataChunks<_DataType>( \ + execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ + AssertInfo(processed_size == real_batch_size, \ + "internal error: expr processed rows {} not equal " \ + "expect batch size {}", \ + processed_size, \ + real_batch_size); \ return res_vec; // Specialized macro for distance-based operations (ST_DWITHIN) @@ -325,12 +342,17 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() { return hit_offsets; }; - // Lambda: Process sealed segment data using bulk_subscript + // Lambda: Process sealed segment data using bulk_subscript with SimpleGeometryCache auto process_sealed_data = [&](const std::vector& hit_offsets) { if (hit_offsets.empty()) return; + // Get simple geometry cache for this segment+field + auto& geometry_cache = + SimpleGeometryCacheManager::Instance().GetCache( + segment_->get_segment_id(), field_id_); + auto data_array = segment_->bulk_subscript( field_id_, hit_offsets.data(), hit_offsets.size()); @@ -348,9 +370,23 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() { } const auto& wkb_data = geometry_array->data(i); - Geometry left(wkb_data.data(), wkb_data.size()); - if (evaluate_geometry(left)) { + // Get or create cached Geometry from simple cache + std::string_view wkb_view(wkb_data.data(), wkb_data.size()); + auto cached_geometry = geometry_cache.GetOrCreate(wkb_view); + + // Evaluate geometry: use cached if available, otherwise construct temporarily + bool result = false; + if (cached_geometry != nullptr) { + result = evaluate_geometry(*cached_geometry); + } else { + // Fallback: construct temporary geometry (cache construction failed) + Geometry temp_geometry(wkb_data.data(), + wkb_data.size()); + result = evaluate_geometry(temp_geometry); + } + + if (result) { refined.set(pos); } } diff --git a/internal/core/src/exec/expression/GISFunctionFilterExpr.h b/internal/core/src/exec/expression/GISFunctionFilterExpr.h index 5fbdd97006..61007eb030 100644 --- a/internal/core/src/exec/expression/GISFunctionFilterExpr.h +++ b/internal/core/src/exec/expression/GISFunctionFilterExpr.h @@ -12,12 +12,14 @@ #pragma once #include +#include #include "common/FieldDataInterface.h" #include "common/Vector.h" #include "exec/expression/Expr.h" #include "expr/ITypeExpr.h" #include "segcore/SegmentInterface.h" +#include "common/GeometryCache.h" namespace milvus { namespace exec { @@ -47,6 +49,11 @@ class PhyGISFunctionFilterExpr : public SegmentExpr { void Eval(EvalCtx& context, VectorPtr& result) override; + std::optional + GetColumnInfo() const override { + return expr_->column_; + } + private: VectorPtr EvalForIndexSegment(); diff --git a/internal/core/src/index/RTreeIndexWrapper.cpp b/internal/core/src/index/RTreeIndexWrapper.cpp index eac62fc34f..ea793f3093 100644 --- a/internal/core/src/index/RTreeIndexWrapper.cpp +++ b/internal/core/src/index/RTreeIndexWrapper.cpp @@ -140,16 +140,25 @@ RTreeIndexWrapper::finish() { // Write meta json nlohmann::json meta; - // index/leaf capacities are not used in Boost implementation meta["dimension"] = dimension_; meta["count"] = static_cast(values_.size()); std::ofstream ofs(index_path_ + ".meta.json", std::ios::trunc); - ofs << meta.dump(); + if (ofs.fail()) { + PanicInfo(ErrorCode::FileOpenFailed, + "Failed to open R-Tree meta file: {}.meta.json", + index_path_); + } + if (!(ofs << meta.dump())) { + PanicInfo(ErrorCode::FileWriteFailed, + "Failed to write R-Tree meta file: {}.meta.json", + index_path_); + } ofs.close(); LOG_INFO("R-Tree meta written: {}.meta.json", index_path_); } catch (const std::exception& e) { - LOG_WARN("Failed to write R-Tree files: {}", e.what()); + PanicInfo(ErrorCode::UnexpectedError, + fmt::format("Failed to write R-Tree files: {}", e.what())); } finished_ = true; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index f8873c54ea..1a85bd9ac2 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -27,6 +27,7 @@ #include "Utils.h" #include "Types.h" +#include "common/GeometryCache.h" #include "common/Array.h" #include "common/Chunk.h" #include "common/ChunkWriter.h" @@ -1309,6 +1310,10 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( } ChunkedSegmentSealedImpl::~ChunkedSegmentSealedImpl() { + // Clean up geometry cache for all fields in this segment + auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance(); + cache_manager.RemoveSegmentCaches(get_segment_id()); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); if (cc == nullptr) { return; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index c17c7d9d91..deba87f3e8 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -32,6 +32,7 @@ #include "common/IndexMeta.h" #include "common/Types.h" #include "query/PlanNode.h" +#include "common/GeometryCache.h" namespace milvus::segcore { @@ -259,6 +260,12 @@ class SegmentGrowingImpl : public SegmentGrowing { } ~SegmentGrowingImpl() { + // Clean up geometry cache for all fields in this segment + auto& cache_manager = + milvus::exec::SimpleGeometryCacheManager::Instance(); + cache_manager.RemoveSegmentCaches(get_segment_id()); + + // Original mmap cleanup logic if (mmap_descriptor_ != nullptr) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index cbefb72f67..0a38b02bfa 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -28,6 +28,7 @@ #include "Utils.h" #include "Types.h" +#include "common/GeometryCache.h" #include "common/Array.h" #include "common/Consts.h" #include "common/EasyAssert.h" @@ -1279,6 +1280,10 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, } SegmentSealedImpl::~SegmentSealedImpl() { + // Clean up geometry cache for all fields in this segment + auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance(); + cache_manager.RemoveSegmentCaches(get_segment_id()); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); if (cc == nullptr) { return; diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 14396b1bb5..39fd5f5907 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -35,6 +35,7 @@ #include "futures/Future.h" #include "futures/Executor.h" #include "exec/expression/ExprCache.h" +#include "common/GeometryCache.h" ////////////////////////////// common interfaces ////////////////////////////// CStatus @@ -90,6 +91,11 @@ NewSegment(CCollection collection, void DeleteSegment(CSegmentInterface c_segment) { auto s = static_cast(c_segment); + + // Clean up geometry cache for all fields in this segment + auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance(); + cache_manager.RemoveSegmentCaches(s->get_segment_id()); + delete s; } diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index 99fc2ad9db..6bbaea5fc7 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -6,6 +6,7 @@ import ( "reflect" "github.com/twpayne/go-geom/encoding/wkb" + "github.com/twpayne/go-geom/encoding/wkbcommon" "github.com/twpayne/go-geom/encoding/wkt" "go.uber.org/zap" @@ -727,7 +728,7 @@ func (v *validateUtil) checkGeometryFieldData(field *schemapb.FieldData, fieldSc log.Warn("insert invalid Geometry data!! The wkt data has errors", zap.Error(err)) return merr.WrapErrIoFailedReason(err.Error()) } - wkbArray[index], err = wkb.Marshal(geomT, wkb.NDR) + wkbArray[index], err = wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN)) if err != nil { log.Warn("insert invalid Geometry data!! Transform to wkb failed, has errors", zap.Error(err)) return merr.WrapErrIoFailedReason(err.Error())