enhance: Refine geometry cache with offsets (#44432)

issue: #43427

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-09-18 20:24:02 +08:00 committed by GitHub
parent 94b1d66535
commit 5b8288a0ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 467 additions and 142 deletions

View File

@ -15,80 +15,93 @@
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <string> #include <string>
#include <string_view>
#include <unordered_map> #include <unordered_map>
#include <vector>
#include "common/EasyAssert.h"
#include "common/Geometry.h" #include "common/Geometry.h"
#include "common/Types.h" #include "common/Types.h"
namespace milvus { namespace milvus {
namespace exec { namespace exec {
// Custom hash function for segment id + field id pair // Helper function to create cache key from segment_id and field_id
struct SegmentFieldHash { inline std::string
std::size_t MakeCacheKey(int64_t segment_id, FieldId field_id) {
operator()(const std::pair<int64_t, int64_t>& p) const { return std::to_string(segment_id) + "_" + std::to_string(field_id.get());
return std::hash<int64_t>{}(p.first) ^ }
(std::hash<int64_t>{}(p.second) << 1);
}
};
// Simple WKB-based Geometry cache for avoiding repeated WKB->Geometry conversions // Vector-based Geometry cache that maintains original field data order
class SimpleGeometryCache { class SimpleGeometryCache {
public: public:
// Get or create Geometry from WKB data // Append WKB data during field loading
std::shared_ptr<const Geometry> void
GetOrCreate(const std::string_view& wkb_data) { AppendData(const char* wkb_data, size_t size) {
// Use WKB data content as key (could be optimized with hash later)
std::string key(wkb_data);
// Try read-only access first (most common case)
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = cache_.find(key);
if (it != cache_.end()) {
return it->second;
}
}
// Cache miss: create new Geometry with write lock
std::lock_guard<std::shared_mutex> lock(mutex_); std::lock_guard<std::shared_mutex> lock(mutex_);
// Double-check after acquiring write lock if (size == 0 || wkb_data == nullptr) {
auto it = cache_.find(key); // Handle null/empty geometry - add invalid geometry
if (it != cache_.end()) { geometries_.emplace_back();
return it->second; } else {
try {
// Create geometry directly in the vector
geometries_.emplace_back(wkb_data, size);
} catch (const std::exception& e) {
PanicInfo(UnexpectedError,
"Failed to construct geometry from WKB data: {}",
e.what());
}
} }
}
// Construct new Geometry // Get shared lock for batch operations (RAII)
try { std::shared_lock<std::shared_mutex>
auto geometry = std::make_shared<const Geometry>(wkb_data.data(), AcquireReadLock() const {
wkb_data.size()); return std::shared_lock<std::shared_mutex>(mutex_);
cache_.emplace(key, geometry); }
return geometry;
} catch (...) { // Get Geometry by offset without locking (use with AcquireReadLock)
// Return nullptr on construction failure const Geometry*
GetByOffsetUnsafe(size_t offset) const {
if (offset >= geometries_.size()) {
return nullptr; return nullptr;
} }
const auto& geometry = geometries_[offset];
return geometry.IsValid() ? &geometry : nullptr;
}
// Get Geometry by offset (thread-safe read for filtering)
const Geometry*
GetByOffset(size_t offset) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return GetByOffsetUnsafe(offset);
}
// Get total number of loaded geometries
size_t
Size() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return geometries_.size();
} }
// Clear all cached geometries // Clear all cached geometries
void void
Clear() { Clear() {
std::lock_guard<std::shared_mutex> lock(mutex_); std::lock_guard<std::shared_mutex> lock(mutex_);
cache_.clear(); geometries_.clear();
} }
// Get cache statistics // Check if cache is loaded
size_t bool
Size() const { IsLoaded() const {
std::shared_lock<std::shared_mutex> lock(mutex_); std::shared_lock<std::shared_mutex> lock(mutex_);
return cache_.size(); return !geometries_.empty();
} }
private: private:
mutable std::shared_mutex mutex_; mutable std::shared_mutex mutex_; // For read/write operations
std::unordered_map<std::string, std::shared_ptr<const Geometry>> cache_; std::vector<Geometry> geometries_; // Direct storage of Geometry objects
}; };
// Global cache instance per segment+field // Global cache instance per segment+field
@ -105,7 +118,7 @@ class SimpleGeometryCacheManager {
SimpleGeometryCache& SimpleGeometryCache&
GetCache(int64_t segment_id, FieldId field_id) { GetCache(int64_t segment_id, FieldId field_id) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto key = std::make_pair(segment_id, field_id.get()); auto key = MakeCacheKey(segment_id, field_id);
auto it = caches_.find(key); auto it = caches_.find(key);
if (it != caches_.end()) { if (it != caches_.end()) {
return *(it->second); return *(it->second);
@ -120,7 +133,7 @@ class SimpleGeometryCacheManager {
void void
RemoveCache(int64_t segment_id, FieldId field_id) { RemoveCache(int64_t segment_id, FieldId field_id) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto key = std::make_pair(segment_id, field_id.get()); auto key = MakeCacheKey(segment_id, field_id);
caches_.erase(key); caches_.erase(key);
} }
@ -128,9 +141,11 @@ class SimpleGeometryCacheManager {
void void
RemoveSegmentCaches(int64_t segment_id) { RemoveSegmentCaches(int64_t segment_id) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto segment_prefix = std::to_string(segment_id) + "_";
auto it = caches_.begin(); auto it = caches_.begin();
while (it != caches_.end()) { while (it != caches_.end()) {
if (it->first.first == segment_id) { if (it->first.substr(0, segment_prefix.length()) ==
segment_prefix) {
it = caches_.erase(it); it = caches_.erase(it);
} else { } else {
++it; ++it;
@ -141,6 +156,7 @@ class SimpleGeometryCacheManager {
// Get cache statistics for monitoring // Get cache statistics for monitoring
struct CacheStats { struct CacheStats {
size_t total_caches = 0; size_t total_caches = 0;
size_t loaded_caches = 0;
size_t total_geometries = 0; size_t total_geometries = 0;
}; };
@ -150,7 +166,10 @@ class SimpleGeometryCacheManager {
CacheStats stats; CacheStats stats;
stats.total_caches = caches_.size(); stats.total_caches = caches_.size();
for (const auto& [key, cache] : caches_) { for (const auto& [key, cache] : caches_) {
stats.total_geometries += cache->Size(); if (cache->IsLoaded()) {
stats.loaded_caches++;
stats.total_geometries += cache->Size();
}
} }
return stats; return stats;
} }
@ -161,11 +180,30 @@ class SimpleGeometryCacheManager {
operator=(const SimpleGeometryCacheManager&) = delete; operator=(const SimpleGeometryCacheManager&) = delete;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::unordered_map<std::pair<int64_t, int64_t>, std::unordered_map<std::string, std::unique_ptr<SimpleGeometryCache>>
std::unique_ptr<SimpleGeometryCache>,
SegmentFieldHash>
caches_; caches_;
}; };
} // namespace exec } // namespace exec
// Convenient global functions for direct access to geometry cache
inline const Geometry*
GetGeometryByOffset(int64_t segment_id, FieldId field_id, size_t offset) {
auto& cache = exec::SimpleGeometryCacheManager::Instance().GetCache(
segment_id, field_id);
return cache.GetByOffset(offset);
}
inline void
RemoveGeometryCache(int64_t segment_id, FieldId field_id) {
exec::SimpleGeometryCacheManager::Instance().RemoveCache(segment_id,
field_id);
}
inline void
RemoveSegmentGeometryCaches(int64_t segment_id) {
exec::SimpleGeometryCacheManager::Instance().RemoveSegmentCaches(
segment_id);
}
} // namespace milvus } // namespace milvus

View File

@ -335,7 +335,10 @@ class SegmentExpr : public Expr {
// used for processing raw data expr for sealed segments. // used for processing raw data expr for sealed segments.
// now only used for std::string_view && json // now only used for std::string_view && json
// TODO: support more types // TODO: support more types
template <typename T, typename FUNC, typename... ValTypes> template <typename T,
bool NeedSegmentOffsets = false,
typename FUNC,
typename... ValTypes>
int64_t int64_t
ProcessChunkForSealedSeg( ProcessChunkForSealedSeg(
FUNC func, FUNC func,
@ -354,13 +357,30 @@ class SegmentExpr : public Expr {
if (!skip_func || !skip_func(skip_index, field_id_, 0)) { if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
// first is the raw data, second is valid_data // first is the raw data, second is valid_data
// use valid_data to see if raw data is null // use valid_data to see if raw data is null
func(views_info.first.data(), if constexpr (NeedSegmentOffsets) {
views_info.second.data(), // For GIS functions: construct segment offsets array
nullptr, std::vector<int32_t> segment_offsets_array(need_size);
need_size, for (int64_t j = 0; j < need_size; ++j) {
res, segment_offsets_array[j] =
valid_res, static_cast<int32_t>(current_data_chunk_pos_ + j);
values...); }
func(views_info.first.data(),
views_info.second.data(),
nullptr,
segment_offsets_array.data(),
need_size,
res,
valid_res,
values...);
} else {
func(views_info.first.data(),
views_info.second.data(),
nullptr,
need_size,
res,
valid_res,
values...);
}
} else { } else {
ApplyValidData(views_info.second.data(), res, valid_res, need_size); ApplyValidData(views_info.second.data(), res, valid_res, need_size);
} }
@ -616,7 +636,11 @@ class SegmentExpr : public Expr {
return input->size(); return input->size();
} }
template <typename T, typename FUNC, typename... ValTypes> // Template parameter to control whether segment offsets are needed (for GIS functions)
template <typename T,
bool NeedSegmentOffsets = false,
typename FUNC,
typename... ValTypes>
int64_t int64_t
ProcessDataChunksForSingleChunk( ProcessDataChunksForSingleChunk(
FUNC func, FUNC func,
@ -628,7 +652,7 @@ class SegmentExpr : public Expr {
if constexpr (std::is_same_v<T, std::string_view> || if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) { std::is_same_v<T, Json>) {
if (segment_->type() == SegmentType::Sealed) { if (segment_->type() == SegmentType::Sealed) {
return ProcessChunkForSealedSeg<T>( return ProcessChunkForSealedSeg<T, NeedSegmentOffsets>(
func, skip_func, res, valid_res, values...); func, skip_func, res, valid_res, values...);
} }
} }
@ -653,15 +677,34 @@ class SegmentExpr : public Expr {
if (valid_data != nullptr) { if (valid_data != nullptr) {
valid_data += data_pos; valid_data += data_pos;
} }
if (!skip_func || !skip_func(skip_index, field_id_, i)) { if (!skip_func || !skip_func(skip_index, field_id_, i)) {
const T* data = chunk.data() + data_pos; const T* data = chunk.data() + data_pos;
func(data,
valid_data, if constexpr (NeedSegmentOffsets) {
nullptr, // For GIS functions: construct segment offsets array
size, std::vector<int32_t> segment_offsets_array(size);
res + processed_size, for (int64_t j = 0; j < size; ++j) {
valid_res + processed_size, segment_offsets_array[j] = static_cast<int32_t>(
values...); size_per_chunk_ * i + data_pos + j);
}
func(data,
valid_data,
nullptr,
segment_offsets_array.data(),
size,
res + processed_size,
valid_res + processed_size,
values...);
} else {
func(data,
valid_data,
nullptr,
size,
res + processed_size,
valid_res + processed_size,
values...);
}
} else { } else {
ApplyValidData(valid_data, ApplyValidData(valid_data,
res + processed_size, res + processed_size,
@ -679,7 +722,11 @@ class SegmentExpr : public Expr {
return processed_size; return processed_size;
} }
template <typename T, typename FUNC, typename... ValTypes>
template <typename T,
bool NeedSegmentOffsets = false,
typename FUNC,
typename... ValTypes>
int64_t int64_t
ProcessDataChunksForMultipleChunk( ProcessDataChunksForMultipleChunk(
FUNC func, FUNC func,
@ -706,7 +753,13 @@ class SegmentExpr : public Expr {
size = std::min(size, batch_size_ - processed_size); size = std::min(size, batch_size_ - processed_size);
if (size == 0) if (size == 0)
continue; //do not go empty-loop at the bound of the chunk continue; //do not go empty-loop at the bound of the chunk
std::vector<int32_t> segment_offsets_array(size);
auto start_offset =
segment_->num_rows_until_chunk(field_id_, i) + data_pos;
for (int64_t j = 0; j < size; ++j) {
int64_t offset = start_offset + j;
segment_offsets_array[j] = static_cast<int32_t>(offset);
}
auto& skip_index = segment_->GetSkipIndex(); auto& skip_index = segment_->GetSkipIndex();
if (!skip_func || !skip_func(skip_index, field_id_, i)) { if (!skip_func || !skip_func(skip_index, field_id_, i)) {
bool is_seal = false; bool is_seal = false;
@ -719,13 +772,26 @@ class SegmentExpr : public Expr {
auto [data_vec, valid_data] = auto [data_vec, valid_data] =
segment_->get_batch_views<T>( segment_->get_batch_views<T>(
field_id_, i, data_pos, size); field_id_, i, data_pos, size);
func(data_vec.data(), if constexpr (NeedSegmentOffsets) {
valid_data.data(), // For GIS functions: construct segment offsets array
nullptr, func(data_vec.data(),
size, valid_data.data(),
res + processed_size, nullptr,
valid_res + processed_size, segment_offsets_array.data(),
values...); size,
res + processed_size,
valid_res + processed_size,
values...);
} else {
// For regular functions: no segment offsets
func(data_vec.data(),
valid_data.data(),
nullptr,
size,
res + processed_size,
valid_res + processed_size,
values...);
}
is_seal = true; is_seal = true;
} }
} }
@ -736,13 +802,26 @@ class SegmentExpr : public Expr {
if (valid_data != nullptr) { if (valid_data != nullptr) {
valid_data += data_pos; valid_data += data_pos;
} }
func(data,
valid_data, if constexpr (NeedSegmentOffsets) {
nullptr, // For GIS functions: construct segment offsets array
size, func(data,
res + processed_size, valid_data,
valid_res + processed_size, nullptr,
values...); segment_offsets_array.data(),
size,
res + processed_size,
valid_res + processed_size,
values...);
} else {
func(data,
valid_data,
nullptr,
size,
res + processed_size,
valid_res + processed_size,
values...);
}
} }
} else { } else {
const bool* valid_data; const bool* valid_data;
@ -782,7 +861,10 @@ class SegmentExpr : public Expr {
return processed_size; return processed_size;
} }
template <typename T, typename FUNC, typename... ValTypes> template <typename T,
bool NeedSegmentOffsets = false,
typename FUNC,
typename... ValTypes>
int64_t int64_t
ProcessDataChunks( ProcessDataChunks(
FUNC func, FUNC func,
@ -791,10 +873,10 @@ class SegmentExpr : public Expr {
TargetBitmapView valid_res, TargetBitmapView valid_res,
ValTypes... values) { ValTypes... values) {
if (segment_->is_chunked()) { if (segment_->is_chunked()) {
return ProcessDataChunksForMultipleChunk<T>( return ProcessDataChunksForMultipleChunk<T, NeedSegmentOffsets>(
func, skip_func, res, valid_res, values...); func, skip_func, res, valid_res, values...);
} else { } else {
return ProcessDataChunksForSingleChunk<T>( return ProcessDataChunksForSingleChunk<T, NeedSegmentOffsets>(
func, skip_func, res, valid_res, values...); func, skip_func, res, valid_res, values...);
} }
} }

View File

@ -15,7 +15,6 @@
#include "common/Geometry.h" #include "common/Geometry.h"
#include "common/Types.h" #include "common/Types.h"
#include "pb/plan.pb.h" #include "pb/plan.pb.h"
#include "pb/schema.pb.h"
#include <cmath> #include <cmath>
#include <fmt/core.h> #include <fmt/core.h>
namespace milvus { namespace milvus {
@ -25,36 +24,32 @@ namespace exec {
auto execute_sub_batch = [this](const _DataType* data, \ auto execute_sub_batch = [this](const _DataType* data, \
const bool* valid_data, \ const bool* valid_data, \
const int32_t* offsets, \ const int32_t* offsets, \
const int32_t* segment_offsets, \
const int size, \ const int size, \
TargetBitmapView res, \ TargetBitmapView res, \
TargetBitmapView valid_res, \ TargetBitmapView valid_res, \
const Geometry& right_source) { \ const Geometry& right_source) { \
AssertInfo(segment_offsets != nullptr, \
"segment_offsets should not be nullptr"); \
/* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \ /* Unified path using simple WKB-content-based cache for both sealed and growing segments. */ \
auto& geometry_cache = \ auto& geometry_cache = \
SimpleGeometryCacheManager::Instance().GetCache( \ SimpleGeometryCacheManager::Instance().GetCache( \
this->segment_->get_segment_id(), field_id_); \ this->segment_->get_segment_id(), field_id_); \
auto cache_lock = geometry_cache.AcquireReadLock(); \
for (int i = 0; i < size; ++i) { \ for (int i = 0; i < size; ++i) { \
if (valid_data != nullptr && !valid_data[i]) { \ if (valid_data != nullptr && !valid_data[i]) { \
res[i] = valid_res[i] = false; \ res[i] = valid_res[i] = false; \
continue; \ continue; \
} \ } \
/* Create string_view from WKB data for cache lookup */ \ auto absolute_offset = segment_offsets[i]; \
std::string_view wkb_view(data[i].data(), data[i].size()); \ auto cached_geometry = \
auto cached_geometry = geometry_cache.GetOrCreate(wkb_view); \ geometry_cache.GetByOffsetUnsafe(absolute_offset); \
\ AssertInfo(cached_geometry != nullptr, \
bool result = false; \ "cached geometry is nullptr"); \
if (cached_geometry != nullptr) { \ res[i] = cached_geometry->method(right_source); \
/* Use cached geometry for operation */ \
result = cached_geometry->method(right_source); \
} else { \
/* Fallback: construct temporary geometry (cache construction failed) */ \
Geometry tmp(data[i].data(), data[i].size()); \
result = tmp.method(right_source); \
} \
res[i] = result; \
} \ } \
}; \ }; \
int64_t processed_size = ProcessDataChunks<_DataType>( \ int64_t processed_size = ProcessDataChunks<_DataType, true>( \
execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \
AssertInfo(processed_size == real_batch_size, \ AssertInfo(processed_size == real_batch_size, \
"internal error: expr processed rows {} not equal " \ "internal error: expr processed rows {} not equal " \
@ -68,20 +63,31 @@ namespace exec {
auto execute_sub_batch = [this](const _DataType* data, \ auto execute_sub_batch = [this](const _DataType* data, \
const bool* valid_data, \ const bool* valid_data, \
const int32_t* offsets, \ const int32_t* offsets, \
const int32_t* segment_offsets, \
const int size, \ const int size, \
TargetBitmapView res, \ TargetBitmapView res, \
TargetBitmapView valid_res, \ TargetBitmapView valid_res, \
const Geometry& right_source) { \ const Geometry& right_source) { \
AssertInfo(segment_offsets != nullptr, \
"segment_offsets should not be nullptr"); \
auto& geometry_cache = \
SimpleGeometryCacheManager::Instance().GetCache( \
this->segment_->get_segment_id(), field_id_); \
auto cache_lock = geometry_cache.AcquireReadLock(); \
for (int i = 0; i < size; ++i) { \ for (int i = 0; i < size; ++i) { \
if (valid_data != nullptr && !valid_data[i]) { \ if (valid_data != nullptr && !valid_data[i]) { \
res[i] = valid_res[i] = false; \ res[i] = valid_res[i] = false; \
continue; \ continue; \
} \ } \
res[i] = Geometry(data[i].data(), data[i].size()) \ auto absolute_offset = segment_offsets[i]; \
.method(right_source, expr_->distance_); \ auto cached_geometry = \
geometry_cache.GetByOffsetUnsafe(absolute_offset); \
AssertInfo(cached_geometry != nullptr, \
"cached geometry is nullptr"); \
res[i] = cached_geometry->method(right_source, expr_->distance_); \
} \ } \
}; \ }; \
int64_t processed_size = ProcessDataChunks<_DataType>( \ int64_t processed_size = ProcessDataChunks<_DataType, true>( \
execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \ execute_sub_batch, std::nullptr_t{}, res, valid_res, right_source); \
AssertInfo(processed_size == real_batch_size, \ AssertInfo(processed_size == real_batch_size, \
"internal error: expr processed rows {} not equal " \ "internal error: expr processed rows {} not equal " \
@ -89,7 +95,6 @@ namespace exec {
processed_size, \ processed_size, \
real_batch_size); \ real_batch_size); \
return res_vec; return res_vec;
void void
PhyGISFunctionFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { PhyGISFunctionFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
AssertInfo(expr_->column_.data_type_ == DataType::GEOMETRY, AssertInfo(expr_->column_.data_type_ == DataType::GEOMETRY,
@ -352,39 +357,17 @@ PhyGISFunctionFilterExpr::EvalForIndexSegment() {
auto& geometry_cache = auto& geometry_cache =
SimpleGeometryCacheManager::Instance().GetCache( SimpleGeometryCacheManager::Instance().GetCache(
segment_->get_segment_id(), field_id_); segment_->get_segment_id(), field_id_);
auto cache_lock = geometry_cache.AcquireReadLock();
auto data_array = segment_->bulk_subscript(
field_id_, hit_offsets.data(), hit_offsets.size());
auto geometry_array =
static_cast<const milvus::proto::schema::GeometryArray*>(
&data_array->scalars().geometry_data());
const auto& valid_data = data_array->valid_data();
for (size_t i = 0; i < hit_offsets.size(); ++i) { for (size_t i = 0; i < hit_offsets.size(); ++i) {
const auto pos = hit_offsets[i]; const auto pos = hit_offsets[i];
// Skip invalid data auto cached_geometry =
if (!valid_data.empty() && !valid_data[i]) { geometry_cache.GetByOffsetUnsafe(pos);
// skip invalid geometry
if (cached_geometry == nullptr) {
continue; continue;
} }
bool result = evaluate_geometry(*cached_geometry);
const auto& wkb_data = geometry_array->data(i);
// Get or create cached Geometry from simple cache
std::string_view wkb_view(wkb_data.data(), wkb_data.size());
auto cached_geometry = geometry_cache.GetOrCreate(wkb_view);
// Evaluate geometry: use cached if available, otherwise construct temporarily
bool result = false;
if (cached_geometry != nullptr) {
result = evaluate_geometry(*cached_geometry);
} else {
// Fallback: construct temporary geometry (cache construction failed)
Geometry temp_geometry(wkb_data.data(),
wkb_data.size());
result = evaluate_geometry(temp_geometry);
}
if (result) { if (result) {
refined.set(pos); refined.set(pos);

View File

@ -383,6 +383,10 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
// var_column->Seal(); // var_column->Seal();
stats_.mem_size += var_column->DataByteSize(); stats_.mem_size += var_column->DataByteSize();
field_data_size = var_column->DataByteSize(); field_data_size = var_column->DataByteSize();
// Construct GeometryCache for the entire field
LoadGeometryCache(field_id, *var_column);
column = std::move(var_column); column = std::move(var_column);
break; break;
} }
@ -1310,9 +1314,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
} }
ChunkedSegmentSealedImpl::~ChunkedSegmentSealedImpl() { ChunkedSegmentSealedImpl::~ChunkedSegmentSealedImpl() {
// Clean up geometry cache for all fields in this segment // Clean up geometry cache for all fields in this segment using global function
auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance(); milvus::RemoveSegmentGeometryCaches(get_segment_id());
cache_manager.RemoveSegmentCaches(get_segment_id());
auto cc = storage::MmapManager::GetInstance().GetChunkCache(); auto cc = storage::MmapManager::GetInstance().GetChunkCache();
if (cc == nullptr) { if (cc == nullptr) {
@ -2219,4 +2222,48 @@ ChunkedSegmentSealedImpl::RemoveFieldFile(const FieldId field_id) {
} }
} }
void
ChunkedSegmentSealedImpl::LoadGeometryCache(
FieldId field_id, const ChunkedVariableColumn<std::string>& var_column) {
try {
// Get geometry cache for this segment+field
auto& geometry_cache =
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
get_segment_id(), field_id);
// Iterate through all chunks and collect WKB data
auto num_chunks = var_column.num_chunks();
for (int64_t chunk_id = 0; chunk_id < num_chunks; ++chunk_id) {
// Get all string views from this chunk
auto [string_views, valid_data] = var_column.StringViews(chunk_id);
// Add each string view to the geometry cache
for (size_t i = 0; i < string_views.size(); ++i) {
if (valid_data.empty() || valid_data[i]) {
// Valid geometry data
const auto& wkb_data = string_views[i];
geometry_cache.AppendData(wkb_data.data(), wkb_data.size());
} else {
// Null/invalid geometry
geometry_cache.AppendData(nullptr, 0);
}
}
}
LOG_INFO(
"Successfully loaded geometry cache for segment {} field {} with "
"{} geometries",
get_segment_id(),
field_id.get(),
geometry_cache.Size());
} catch (const std::exception& e) {
PanicInfo(UnexpectedError,
"Failed to load geometry cache for segment {} field {}: {}",
get_segment_id(),
field_id.get(),
e.what());
}
}
} // namespace milvus::segcore } // namespace milvus::segcore

View File

@ -284,6 +284,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return insert_record_.timestamps_; return insert_record_.timestamps_;
} }
// Load Geometry cache for a field
void
LoadGeometryCache(FieldId field_id,
const ChunkedVariableColumn<std::string>& var_column);
private: private:
template <typename S, typename T = S> template <typename S, typename T = S>
static void static void

View File

@ -178,6 +178,14 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
reserved_offset); reserved_offset);
} }
// Build geometry cache for GEOMETRY fields
if (field_meta.get_data_type() == DataType::GEOMETRY) {
BuildGeometryCacheForInsert(
field_id,
&insert_record_proto->fields_data(data_offset),
num_rows);
}
// update average row data size // update average row data size
auto field_data_size = GetRawDataSizeOfDataArray( auto field_data_size = GetRawDataSizeOfDataArray(
&insert_record_proto->fields_data(data_offset), &insert_record_proto->fields_data(data_offset),
@ -315,6 +323,11 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
index->Reload(); index->Reload();
} }
// Build geometry cache for GEOMETRY fields
if (field_meta.get_data_type() == DataType::GEOMETRY) {
BuildGeometryCacheForLoad(field_id, field_data);
}
// update the mem size // update the mem size
stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data); stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data);
@ -1008,4 +1021,96 @@ SegmentGrowingImpl::GetJsonData(FieldId field_id, size_t offset) const {
return std::make_pair(std::string_view(src[offset]), true); return std::make_pair(std::string_view(src[offset]), true);
} }
void
SegmentGrowingImpl::BuildGeometryCacheForInsert(FieldId field_id,
const DataArray* data_array,
int64_t num_rows) {
try {
// Get geometry cache for this segment+field
auto& geometry_cache =
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
get_segment_id(), field_id);
// Process geometry data from DataArray
const auto& geometry_data = data_array->scalars().geometry_data();
const auto& valid_data = data_array->valid_data();
for (int64_t i = 0; i < num_rows; ++i) {
if (valid_data.empty() ||
(i < valid_data.size() && valid_data[i])) {
// Valid geometry data
const auto& wkb_data = geometry_data.data(i);
geometry_cache.AppendData(wkb_data.data(), wkb_data.size());
} else {
// Null/invalid geometry
geometry_cache.AppendData(nullptr, 0);
}
}
LOG_INFO(
"Successfully appended {} geometries to cache for growing segment "
"{} field {}",
num_rows,
get_segment_id(),
field_id.get());
} catch (const std::exception& e) {
PanicInfo(UnexpectedError,
"Failed to build geometry cache for growing segment {} field "
"{} insert: {}",
get_segment_id(),
field_id.get(),
e.what());
}
}
void
SegmentGrowingImpl::BuildGeometryCacheForLoad(
FieldId field_id, const std::vector<FieldDataPtr>& field_data) {
try {
// Get geometry cache for this segment+field
auto& geometry_cache =
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
get_segment_id(), field_id);
// Process each field data chunk
for (const auto& data : field_data) {
auto num_rows = data->get_num_rows();
for (int64_t i = 0; i < num_rows; ++i) {
if (data->is_valid(i)) {
// Valid geometry data
auto wkb_data =
static_cast<const std::string*>(data->RawValue(i));
geometry_cache.AppendData(wkb_data->data(),
wkb_data->size());
} else {
// Null/invalid geometry
geometry_cache.AppendData(nullptr, 0);
}
}
}
size_t total_rows = 0;
for (const auto& data : field_data) {
total_rows += data->get_num_rows();
}
LOG_INFO(
"Successfully loaded {} geometries to cache for growing segment {} "
"field {}",
total_rows,
get_segment_id(),
field_id.get());
} catch (const std::exception& e) {
PanicInfo(UnexpectedError,
"Failed to build geometry cache for growing segment {} field "
"{} load: {}",
get_segment_id(),
field_id.get(),
e.what());
}
}
} // namespace milvus::segcore } // namespace milvus::segcore

View File

@ -85,6 +85,18 @@ class SegmentGrowingImpl : public SegmentGrowing {
void void
CreateTextIndex(FieldId field_id) override; CreateTextIndex(FieldId field_id) override;
private:
// Build geometry cache for inserted data
void
BuildGeometryCacheForInsert(FieldId field_id,
const DataArray* data_array,
int64_t num_rows);
// Build geometry cache for loaded field data
void
BuildGeometryCacheForLoad(FieldId field_id,
const std::vector<FieldDataPtr>& field_data);
public: public:
const InsertRecord<>& const InsertRecord<>&
get_insert_record() const { get_insert_record() const {

View File

@ -445,6 +445,10 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
var_column->Seal(); var_column->Seal();
stats_.mem_size += var_column->MemoryUsageBytes(); stats_.mem_size += var_column->MemoryUsageBytes();
field_data_size = var_column->DataByteSize(); field_data_size = var_column->DataByteSize();
// Construct GeometryCache for the entire field
LoadGeometryCache(field_id, *var_column);
column = std::move(var_column); column = std::move(var_column);
break; break;
} }
@ -617,6 +621,10 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
field_meta, field_meta,
DEFAULT_MMAP_VRCOL_BLOCK_SIZE); DEFAULT_MMAP_VRCOL_BLOCK_SIZE);
var_column->Seal(std::move(indices)); var_column->Seal(std::move(indices));
// Construct GeometryCache for the entire field (mmap mode)
LoadGeometryCache(field_id, *var_column);
column = std::move(var_column); column = std::move(var_column);
break; break;
} }
@ -1280,9 +1288,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
} }
SegmentSealedImpl::~SegmentSealedImpl() { SegmentSealedImpl::~SegmentSealedImpl() {
// Clean up geometry cache for all fields in this segment // Clean up geometry cache for all fields in this segment using global function
auto& cache_manager = milvus::exec::SimpleGeometryCacheManager::Instance(); milvus::RemoveSegmentGeometryCaches(get_segment_id());
cache_manager.RemoveSegmentCaches(get_segment_id());
auto cc = storage::MmapManager::GetInstance().GetChunkCache(); auto cc = storage::MmapManager::GetInstance().GetChunkCache();
if (cc == nullptr) { if (cc == nullptr) {
@ -2255,4 +2262,45 @@ SegmentSealedImpl::GetJsonData(FieldId field_id, size_t offset) const {
return std::make_pair(std::move(column->RawAt(offset)), is_valid); return std::make_pair(std::move(column->RawAt(offset)), is_valid);
} }
void
SegmentSealedImpl::LoadGeometryCache(
FieldId field_id,
const SingleChunkVariableColumn<std::string>& var_column) {
try {
// Get geometry cache for this segment+field
auto& geometry_cache =
milvus::exec::SimpleGeometryCacheManager::Instance().GetCache(
get_segment_id(), field_id);
// Get all string views from the single chunk
auto [string_views, valid_data] = var_column.StringViews();
// Add each string view to the geometry cache
for (size_t i = 0; i < string_views.size(); ++i) {
if (valid_data.empty() || valid_data[i]) {
// Valid geometry data
const auto& wkb_data = string_views[i];
geometry_cache.AppendData(wkb_data.data(), wkb_data.size());
} else {
// Null/invalid geometry
geometry_cache.AppendData(nullptr, 0);
}
}
LOG_INFO(
"Successfully loaded geometry cache for segment {} field {} with "
"{} geometries",
get_segment_id(),
field_id.get(),
geometry_cache.Size());
} catch (const std::exception& e) {
PanicInfo(UnexpectedError,
"Failed to load geometry cache for segment {} field {}: {}",
get_segment_id(),
field_id.get(),
e.what());
}
}
} // namespace milvus::segcore } // namespace milvus::segcore

View File

@ -339,6 +339,11 @@ class SegmentSealedImpl : public SegmentSealed {
deleted_record_.set_sealed_row_count(row_count); deleted_record_.set_sealed_row_count(row_count);
} }
// Load Geometry cache for a field
void
LoadGeometryCache(FieldId field_id,
const SingleChunkVariableColumn<std::string>& var_column);
void void
mask_with_timestamps(BitsetTypeView& bitset_chunk, mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp, Timestamp timestamp,