mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
fix: Fix geometry bugs and add cache for create Geometry (#44376)
issue: #44102, #44079, #44075 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
d44bdcd76c
commit
124a1b3ce4
2
go.mod
2
go.mod
@ -75,7 +75,7 @@ require (
|
||||
github.com/remeh/sizedwaitgroup v1.0.0
|
||||
github.com/shirou/gopsutil/v4 v4.24.10
|
||||
github.com/tidwall/gjson v1.17.1
|
||||
github.com/twpayne/go-geom v1.5.7
|
||||
github.com/twpayne/go-geom v1.6.1
|
||||
github.com/valyala/fastjson v1.6.4
|
||||
github.com/zeebo/xxh3 v1.0.2
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
|
||||
|
||||
4
go.sum
4
go.sum
@ -931,8 +931,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
|
||||
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/twpayne/go-geom v1.5.7 h1:7fdceDUr03/MP7rAKOaTV6x9njMiQdxB/D0PDzMTCDc=
|
||||
github.com/twpayne/go-geom v1.5.7/go.mod h1:y4fTAQtLedXW8eG2Yo4tYrIGN1yIwwKkmA+K3iSHKBA=
|
||||
github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4=
|
||||
github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028=
|
||||
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
|
||||
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
|
||||
171
internal/core/src/common/GeometryCache.h
Normal file
171
internal/core/src/common/GeometryCache.h
Normal file
@ -0,0 +1,171 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "common/Geometry.h"
|
||||
#include "common/Types.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace exec {
|
||||
|
||||
// Custom hash function for segment id + field id pair
|
||||
struct SegmentFieldHash {
|
||||
std::size_t
|
||||
operator()(const std::pair<int64_t, int64_t>& p) const {
|
||||
return std::hash<int64_t>{}(p.first) ^
|
||||
(std::hash<int64_t>{}(p.second) << 1);
|
||||
}
|
||||
};
|
||||
|
||||
// Simple WKB-based Geometry cache for avoiding repeated WKB->Geometry conversions
|
||||
class SimpleGeometryCache {
|
||||
public:
|
||||
// Get or create Geometry from WKB data
|
||||
std::shared_ptr<const Geometry>
|
||||
GetOrCreate(const std::string_view& wkb_data) {
|
||||
// Use WKB data content as key (could be optimized with hash later)
|
||||
std::string key(wkb_data);
|
||||
|
||||
// Try read-only access first (most common case)
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> lock(mutex_);
|
||||
auto it = cache_.find(key);
|
||||
if (it != cache_.end()) {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
// Cache miss: create new Geometry with write lock
|
||||
std::lock_guard<std::shared_mutex> lock(mutex_);
|
||||
|
||||
// Double-check after acquiring write lock
|
||||
auto it = cache_.find(key);
|
||||
if (it != cache_.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
// Construct new Geometry
|
||||
try {
|
||||
auto geometry = std::make_shared<const Geometry>(wkb_data.data(),
|
||||
wkb_data.size());
|
||||
cache_.emplace(key, geometry);
|
||||
return geometry;
|
||||
} catch (...) {
|
||||
// Return nullptr on construction failure
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear all cached geometries
|
||||
void
|
||||
Clear() {
|
||||
std::lock_guard<std::shared_mutex> lock(mutex_);
|
||||
cache_.clear();
|
||||
}
|
||||
|
||||
// Get cache statistics
|
||||
size_t
|
||||
Size() const {
|
||||
std::shared_lock<std::shared_mutex> lock(mutex_);
|
||||
return cache_.size();
|
||||
}
|
||||
|
||||
private:
|
||||
mutable std::shared_mutex mutex_;
|
||||
std::unordered_map<std::string, std::shared_ptr<const Geometry>> cache_;
|
||||
};
|
||||
|
||||
// Global cache instance per segment+field
|
||||
class SimpleGeometryCacheManager {
|
||||
public:
|
||||
static SimpleGeometryCacheManager&
|
||||
Instance() {
|
||||
static SimpleGeometryCacheManager instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
SimpleGeometryCacheManager() = default;
|
||||
|
||||
SimpleGeometryCache&
|
||||
GetCache(int64_t segment_id, FieldId field_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto key = std::make_pair(segment_id, field_id.get());
|
||||
auto it = caches_.find(key);
|
||||
if (it != caches_.end()) {
|
||||
return *(it->second);
|
||||
}
|
||||
|
||||
auto cache = std::make_unique<SimpleGeometryCache>();
|
||||
auto* cache_ptr = cache.get();
|
||||
caches_.emplace(key, std::move(cache));
|
||||
return *cache_ptr;
|
||||
}
|
||||
|
||||
void
|
||||
RemoveCache(int64_t segment_id, FieldId field_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto key = std::make_pair(segment_id, field_id.get());
|
||||
caches_.erase(key);
|
||||
}
|
||||
|
||||
// Remove all caches for a segment (useful when segment is destroyed)
|
||||
void
|
||||
RemoveSegmentCaches(int64_t segment_id) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
auto it = caches_.begin();
|
||||
while (it != caches_.end()) {
|
||||
if (it->first.first == segment_id) {
|
||||
it = caches_.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get cache statistics for monitoring
|
||||
struct CacheStats {
|
||||
size_t total_caches = 0;
|
||||
size_t total_geometries = 0;
|
||||
};
|
||||
|
||||
CacheStats
|
||||
GetStats() const {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
CacheStats stats;
|
||||
stats.total_caches = caches_.size();
|
||||
for (const auto& [key, cache] : caches_) {
|
||||
stats.total_geometries += cache->Size();
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
private:
|
||||
SimpleGeometryCacheManager(const SimpleGeometryCacheManager&) = delete;
|
||||
SimpleGeometryCacheManager&
|
||||
operator=(const SimpleGeometryCacheManager&) = delete;
|
||||
|
||||
mutable std::mutex mutex_;
|
||||
std::unordered_map<std::pair<int64_t, int64_t>,
|
||||
std::unique_ptr<SimpleGeometryCache>,
|
||||
SegmentFieldHash>
|
||||
caches_;
|
||||
};
|
||||
|
||||
} // namespace exec
|
||||
} // namespace milvus
|
||||
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include "GISFunctionFilterExpr.h"
|
||||
#include <cstdlib>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Geometry.h"
|
||||
#include "common/Types.h"
|
||||
@ -20,30 +21,46 @@
|
||||
namespace milvus {
|
||||
namespace exec {
|
||||
|
||||
#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \
|
||||
auto execute_sub_batch = [](const _DataType* data, \
|
||||
const bool* valid_data, \
|
||||
const int32_t* offsets, \
|
||||
const int size, \
|
||||
TargetBitmapView res, \
|
||||
TargetBitmapView valid_res, \
|
||||
const Geometry& right_source) { \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
continue; \
|
||||
} \
|
||||
res[i] = \
|
||||
Geometry(data[i].data(), data[i].size()).method(right_source); \
|
||||
} \
|
||||
}; \
|
||||
int64_t processed_size = ProcessDataChunks<_DataType>( \
|
||||
execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \
|
||||
AssertInfo(processed_size == real_batch_size, \
|
||||
"internal error: expr processed rows {} not equal " \
|
||||
"expect batch size {}", \
|
||||
processed_size, \
|
||||
real_batch_size); \
|
||||
#define GEOMETRY_EXECUTE_SUB_BATCH_WITH_COMPARISON(_DataType, method) \
|
||||
auto execute_sub_batch = [this](const _DataType* data, \
|
||||
const bool* valid_data, \
|
||||
const int32_t* offsets, \
|
||||
const int size, \
|
||||
TargetBitmapView res, \
|
||||
TargetBitmapView valid_res, \
|
||||
const Geometry& right_source) { \
|
||||
/* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \
|
||||
auto& geometry_cache = \
|
||||
SimpleGeometryCacheManager::Instance().GetCache( \
|
||||
this->segment_->get_segment_id(), field_id_); \
|
||||
for (int i = 0; i < size; ++i) { \
|
||||
if (valid_data != nullptr && !valid_data[i]) { \
|
||||
res[i] = valid_res[i] = false; \
|
||||
continue; \
|
||||
} \
|
||||
/* Create string_view from WKB data for cache lookup */ \
|
||||
std::string_view wkb_view(data[i].data(), data[i].size()); \
|
||||
auto cached_geometry = geometry_cache.GetOrCreate(wkb_view); \
|
||||
\
|
||||
bool result = false; \
|
||||
if (cached_geometry != nullptr) { \
|
||||
/* Use cached geometry for operation */ \
|
||||
result = cached_geometry->method(right_source); \
|
||||
} else { \
|
||||
/* Fallback: construct temporary geometry (cache construction failed) */ \
|
||||
Geometry tmp(data[i].data(), data[i].size()); \
|
||||
result = tmp.method(right_source); \
|
||||
} \
|
||||
res[i] = result; \
|
||||
} \
|
||||
}; \
|
||||
int64_t processed_size = ProcessDataChunks<_DataType>( \
|
||||
execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \
|
||||
AssertInfo(processed_size == real_batch_size, \
|
||||
"internal error: expr processed rows {} not equal " \
|
||||
"expect batch size {}", \
|
||||
processed_size, \
|
||||
real_batch_size); \
|
||||
return res_vec;
|
||||
|
||||
// Specialized macro for distance-based operations (ST_DWITHIN)
|
||||
@ -325,12 +342,17 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() {
|
||||
return hit_offsets;
|
||||
};
|
||||
|
||||
// Lambda: Process sealed segment data using bulk_subscript
|
||||
// Lambda: Process sealed segment data using bulk_subscript with SimpleGeometryCache
|
||||
auto process_sealed_data =
|
||||
[&](const std::vector<int64_t>& hit_offsets) {
|
||||
if (hit_offsets.empty())
|
||||
return;
|
||||
|
||||
// Get simple geometry cache for this segment+field
|
||||
auto& geometry_cache =
|
||||
SimpleGeometryCacheManager::Instance().GetCache(
|
||||
segment_->get_segment_id(), field_id_);
|
||||
|
||||
auto data_array = segment_->bulk_subscript(
|
||||
field_id_, hit_offsets.data(), hit_offsets.size());
|
||||
|
||||
@ -348,9 +370,23 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() {
|
||||
}
|
||||
|
||||
const auto& wkb_data = geometry_array->data(i);
|
||||
Geometry left(wkb_data.data(), wkb_data.size());
|
||||
|
||||
if (evaluate_geometry(left)) {
|
||||
// Get or create cached Geometry from simple cache
|
||||
std::string_view wkb_view(wkb_data.data(), wkb_data.size());
|
||||
auto cached_geometry = geometry_cache.GetOrCreate(wkb_view);
|
||||
|
||||
// Evaluate geometry: use cached if available, otherwise construct temporarily
|
||||
bool result = false;
|
||||
if (cached_geometry != nullptr) {
|
||||
result = evaluate_geometry(*cached_geometry);
|
||||
} else {
|
||||
// Fallback: construct temporary geometry (cache construction failed)
|
||||
Geometry temp_geometry(wkb_data.data(),
|
||||
wkb_data.size());
|
||||
result = evaluate_geometry(temp_geometry);
|
||||
}
|
||||
|
||||
if (result) {
|
||||
refined.set(pos);
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,12 +12,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <fmt/core.h>
|
||||
#include <memory>
|
||||
|
||||
#include "common/FieldDataInterface.h"
|
||||
#include "common/Vector.h"
|
||||
#include "exec/expression/Expr.h"
|
||||
#include "expr/ITypeExpr.h"
|
||||
#include "segcore/SegmentInterface.h"
|
||||
#include "common/GeometryCache.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace exec {
|
||||
@ -47,6 +49,11 @@ class PhyGISFunctionFilterExpr : public SegmentExpr {
|
||||
void
|
||||
Eval(EvalCtx& context, VectorPtr& result) override;
|
||||
|
||||
std::optional<milvus::expr::ColumnInfo>
|
||||
GetColumnInfo() const override {
|
||||
return expr_->column_;
|
||||
}
|
||||
|
||||
private:
|
||||
VectorPtr
|
||||
EvalForIndexSegment();
|
||||
|
||||
@ -140,16 +140,25 @@ RTreeIndexWrapper::finish() {
|
||||
|
||||
// Write meta json
|
||||
nlohmann::json meta;
|
||||
// index/leaf capacities are not used in Boost implementation
|
||||
meta["dimension"] = dimension_;
|
||||
meta["count"] = static_cast<uint64_t>(values_.size());
|
||||
|
||||
std::ofstream ofs(index_path_ + ".meta.json", std::ios::trunc);
|
||||
ofs << meta.dump();
|
||||
if (ofs.fail()) {
|
||||
PanicInfo(ErrorCode::FileOpenFailed,
|
||||
"Failed to open R-Tree meta file: {}.meta.json",
|
||||
index_path_);
|
||||
}
|
||||
if (!(ofs << meta.dump())) {
|
||||
PanicInfo(ErrorCode::FileWriteFailed,
|
||||
"Failed to write R-Tree meta file: {}.meta.json",
|
||||
index_path_);
|
||||
}
|
||||
ofs.close();
|
||||
LOG_INFO("R-Tree meta written: {}.meta.json", index_path_);
|
||||
} catch (const std::exception& e) {
|
||||
LOG_WARN("Failed to write R-Tree files: {}", e.what());
|
||||
PanicInfo(ErrorCode::UnexpectedError,
|
||||
fmt::format("Failed to write R-Tree files: {}", e.what()));
|
||||
}
|
||||
|
||||
finished_ = true;
|
||||
|
||||
@ -27,6 +27,7 @@
|
||||
|
||||
#include "Utils.h"
|
||||
#include "Types.h"
|
||||
#include "common/GeometryCache.h"
|
||||
#include "common/Array.h"
|
||||
#include "common/Chunk.h"
|
||||
#include "common/ChunkWriter.h"
|
||||
@ -1309,6 +1310,10 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
|
||||
}
|
||||
|
||||
ChunkedSegmentSealedImpl::~ChunkedSegmentSealedImpl() {
|
||||
// Clean up geometry cache for all fields in this segment
|
||||
auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance();
|
||||
cache_manager.RemoveSegmentCaches(get_segment_id());
|
||||
|
||||
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
|
||||
if (cc == nullptr) {
|
||||
return;
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include "common/IndexMeta.h"
|
||||
#include "common/Types.h"
|
||||
#include "query/PlanNode.h"
|
||||
#include "common/GeometryCache.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
@ -259,6 +260,12 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
}
|
||||
|
||||
~SegmentGrowingImpl() {
|
||||
// Clean up geometry cache for all fields in this segment
|
||||
auto& cache_manager =
|
||||
milvus::exec::SimpleGeometryCacheManager::Instance();
|
||||
cache_manager.RemoveSegmentCaches(get_segment_id());
|
||||
|
||||
// Original mmap cleanup logic
|
||||
if (mmap_descriptor_ != nullptr) {
|
||||
auto mcm =
|
||||
storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
|
||||
#include "Utils.h"
|
||||
#include "Types.h"
|
||||
#include "common/GeometryCache.h"
|
||||
#include "common/Array.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
@ -1279,6 +1280,10 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
|
||||
}
|
||||
|
||||
SegmentSealedImpl::~SegmentSealedImpl() {
|
||||
// Clean up geometry cache for all fields in this segment
|
||||
auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance();
|
||||
cache_manager.RemoveSegmentCaches(get_segment_id());
|
||||
|
||||
auto cc = storage::MmapManager::GetInstance().GetChunkCache();
|
||||
if (cc == nullptr) {
|
||||
return;
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
#include "futures/Future.h"
|
||||
#include "futures/Executor.h"
|
||||
#include "exec/expression/ExprCache.h"
|
||||
#include "common/GeometryCache.h"
|
||||
|
||||
////////////////////////////// common interfaces //////////////////////////////
|
||||
CStatus
|
||||
@ -90,6 +91,11 @@ NewSegment(CCollection collection,
|
||||
void
|
||||
DeleteSegment(CSegmentInterface c_segment) {
|
||||
auto s = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
|
||||
|
||||
// Clean up geometry cache for all fields in this segment
|
||||
auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance();
|
||||
cache_manager.RemoveSegmentCaches(s->get_segment_id());
|
||||
|
||||
delete s;
|
||||
}
|
||||
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"reflect"
|
||||
|
||||
"github.com/twpayne/go-geom/encoding/wkb"
|
||||
"github.com/twpayne/go-geom/encoding/wkbcommon"
|
||||
"github.com/twpayne/go-geom/encoding/wkt"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -727,7 +728,7 @@ func (v *validateUtil) checkGeometryFieldData(field *schemapb.FieldData, fieldSc
|
||||
log.Warn("insert invalid Geometry data!! The wkt data has errors", zap.Error(err))
|
||||
return merr.WrapErrIoFailedReason(err.Error())
|
||||
}
|
||||
wkbArray[index], err = wkb.Marshal(geomT, wkb.NDR)
|
||||
wkbArray[index], err = wkb.Marshal(geomT, wkb.NDR, wkbcommon.WKBOptionEmptyPointHandling(wkbcommon.EmptyPointHandlingNaN))
|
||||
if err != nil {
|
||||
log.Warn("insert invalid Geometry data!! Transform to wkb failed, has errors", zap.Error(err))
|
||||
return merr.WrapErrIoFailedReason(err.Error())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user