enhance: add caching layer resource management for streaming node (#46465)

issue: #41435

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added segment resource tracking and automatic memory/disk accounting
during inserts, deletes, loads and reopen.
  * Exposed a configuration to set interim index memory expansion rate.
* Added explicit loaded-resource charge/refund operations and Bloom
filter resource lifecycle management.

* **Bug Fixes**
* Ensured consistent memory-size vs. row-count calculations across
segment operations.
* Improved resource refunding and cleanup when segments are released or
closed.

* **Tests**
* Added comprehensive resource-tracking and concurrency tests, plus
Bloom filter accounting tests.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2026-01-12 15:47:27 +08:00 committed by GitHub
parent 670f2cc5e8
commit 2ea16b2f6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1093 additions and 34 deletions

View File

@ -144,7 +144,7 @@ class DeletedRecord {
deleted_mask_.set(row_id);
} else {
// need to add mask size firstly for growing segment
deleted_mask_.resize(insert_record_->size());
deleted_mask_.resize(insert_record_->row_count());
deleted_mask_.set(row_id);
}
removed_num++;
@ -170,6 +170,13 @@ class DeletedRecord {
}
estimated_memory_size_ = new_estimated_size;
}
} else {
// The resource usage of DeletedRecord for a Growing Segment is already counted in
// SegmentGrowingImpl::EstimateSegmentResourceUsage(), so there is no need to count it here.
// The reason we don't count it here is that we treat the Growing Segment as a single unit,
// we do not track memory separately for each field.
// If you intend to add this tracking here, first consider how to count the memory usage separately
// within the growing segment.
}
return max_timestamp;
@ -245,7 +252,7 @@ class DeletedRecord {
if constexpr (is_sealed) {
bitsize = sealed_row_count_;
} else {
bitsize = insert_record_->size();
bitsize = insert_record_->row_count();
}
BitsetType bitmap(bitsize, false);

View File

@ -75,7 +75,7 @@ class OffsetMap {
clear() = 0;
virtual size_t
size() const = 0;
memory_size() const = 0;
};
template <typename T>
@ -203,7 +203,7 @@ class OffsetOrderedMap : public OffsetMap {
}
size_t
size() const override {
memory_size() const override {
std::shared_lock<std::shared_mutex> lck(mtx_);
return map_.get_allocator().total_allocated();
}
@ -383,7 +383,7 @@ class OffsetOrderedArray : public OffsetMap {
}
size_t
size() const override {
memory_size() const override {
return sizeof(std::pair<T, int32_t>) * array_.capacity();
}
@ -592,8 +592,8 @@ class InsertRecordSealed {
pk2offset_->seal();
// update estimated memory size to caching layer
cachinglayer::Manager::GetInstance().ChargeLoadedResource(
{static_cast<int64_t>(pk2offset_->size()), 0});
estimated_memory_size_ += pk2offset_->size();
{static_cast<int64_t>(pk2offset_->memory_size()), 0});
estimated_memory_size_ += pk2offset_->memory_size();
}
void
@ -604,11 +604,11 @@ class InsertRecordSealed {
timestamp_index_ = std::move(timestamp_index);
AssertInfo(timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
size_t size =
timestamps.size() * sizeof(Timestamp) + timestamp_index_.size();
size_t memory_size = timestamps.size() * sizeof(Timestamp) +
timestamp_index_.memory_size();
cachinglayer::Manager::GetInstance().ChargeLoadedResource(
{static_cast<int64_t>(size), 0});
estimated_memory_size_ += size;
{static_cast<int64_t>(memory_size), 0});
estimated_memory_size_ += memory_size;
}
const ConcurrentVector<Timestamp>&
@ -1076,7 +1076,7 @@ class InsertRecordGrowing {
}
int64_t
size() const {
row_count() const {
return ack_responder_.GetAck();
}

View File

@ -20,7 +20,7 @@ inline int64_t
get_barrier(const RecordType& record, Timestamp timestamp) {
auto& vec = record.timestamps();
int64_t beg = 0;
int64_t end = record.size();
int64_t end = record.row_count();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] <= timestamp) {

View File

@ -157,6 +157,16 @@ class SegcoreConfig {
return enable_geometry_cache_;
}
void
set_interim_index_mem_expansion_rate(float rate) {
interim_index_mem_expansion_rate_ = rate;
}
float
get_interim_index_mem_expansion_rate() const {
return interim_index_mem_expansion_rate_;
}
private:
inline static const std::unordered_set<std::string>
valid_dense_vector_index_type = {
@ -176,6 +186,7 @@ class SegcoreConfig {
knowhere::RefineType::DATA_VIEW;
inline static bool refine_with_quant_flag_ = false;
inline static bool enable_geometry_cache_ = false;
inline static float interim_index_mem_expansion_rate_ = 1.15f;
};
} // namespace milvus::segcore

View File

@ -47,6 +47,7 @@
#include "storage/ThreadPools.h"
#include "storage/KeyRetriever.h"
#include "common/TypeTraits.h"
#include "knowhere/comp/index_param.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "milvus-storage/filesystem/fs.h"
@ -247,6 +248,212 @@ SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) {
}
}
ResourceUsage
SegmentGrowingImpl::EstimateSegmentResourceUsage() const {
int64_t num_rows = get_row_count();
if (num_rows == 0) {
return ResourceUsage{0, 0};
}
bool growing_mmap_enabled = storage::MmapManager::GetInstance()
.GetMmapConfig()
.GetEnableGrowingMmap();
int64_t memory_bytes = 0;
int64_t disk_bytes = 0;
// 1. Timestamps: always in memory for now
memory_bytes += num_rows * sizeof(Timestamp);
// 2. pk2offset_ map: always in memory
// Use actual allocated memory from the tracking allocator
memory_bytes += insert_record_.pk2offset_->memory_size();
// 3. Field data and interim index
// For vector fields with interim index:
// - IVF_FLAT_CC: index stores raw data, so count index_size = raw_size * memExpansionRate (memory)
// - SCANN_DVR: index doesn't store raw data, so count raw_size (memory or mmap)
// For other fields: count raw_size based on mmap setting
bool interim_index_enabled =
segcore_config_.get_enable_interim_segment_index();
bool is_ivf_flat_cc =
segcore_config_.get_dense_vector_intermin_index_type() ==
knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC;
for (const auto& [field_id, field_meta] : schema_->get_fields()) {
if (field_id.get() < START_USER_FIELDID) {
continue;
}
int64_t field_bytes = 0;
auto data_type = field_meta.get_data_type();
if (field_meta.is_vector()) {
// Calculate raw vector size
// Note: get_dim() cannot be called on sparse vectors, so handle that case separately
if (data_type == DataType::VECTOR_SPARSE_U32_F32) {
field_bytes =
num_rows *
SegmentInternalInterface::get_field_avg_size(field_id);
} else {
int64_t dim = field_meta.get_dim();
switch (data_type) {
case DataType::VECTOR_FLOAT:
field_bytes = num_rows * dim * sizeof(float);
break;
case DataType::VECTOR_FLOAT16:
field_bytes = num_rows * dim * sizeof(float16);
break;
case DataType::VECTOR_BFLOAT16:
field_bytes = num_rows * dim * sizeof(bfloat16);
break;
case DataType::VECTOR_BINARY:
field_bytes = num_rows * (dim / 8);
break;
case DataType::VECTOR_INT8:
field_bytes = num_rows * dim * sizeof(int8_t);
break;
default:
break;
}
}
// Check if this field has interim index
bool has_interim_index =
interim_index_enabled && indexing_record_.is_in(field_id);
if (has_interim_index) {
if (data_type == DataType::VECTOR_SPARSE_U32_F32) {
// sparse vector interim index does not support file mmap
// index memory + raw data memory ~ 2x raw data memory
memory_bytes += field_bytes * 2;
} else {
// Dense vector interim index estimation based on index type
if (is_ivf_flat_cc) {
// IVF_FLAT_CC: index stores raw data
// Index memory = raw_size * memExpansionRate
memory_bytes += static_cast<int64_t>(
field_bytes *
segcore_config_
.get_interim_index_mem_expansion_rate());
} else {
// SCANN_DVR or no interim index
if (growing_mmap_enabled) {
disk_bytes += field_bytes;
} else {
memory_bytes += field_bytes;
}
}
}
} else {
if (growing_mmap_enabled) {
disk_bytes += field_bytes;
} else {
memory_bytes += field_bytes;
}
}
} else {
// Scalar fields
switch (data_type) {
case DataType::BOOL:
field_bytes = num_rows * sizeof(bool);
break;
case DataType::INT8:
field_bytes = num_rows * sizeof(int8_t);
break;
case DataType::INT16:
field_bytes = num_rows * sizeof(int16_t);
break;
case DataType::INT32:
field_bytes = num_rows * sizeof(int32_t);
break;
case DataType::INT64:
case DataType::TIMESTAMPTZ:
field_bytes = num_rows * sizeof(int64_t);
break;
case DataType::FLOAT:
field_bytes = num_rows * sizeof(float);
break;
case DataType::DOUBLE:
field_bytes = num_rows * sizeof(double);
break;
case DataType::VARCHAR:
case DataType::TEXT:
case DataType::GEOMETRY: {
auto avg_size =
SegmentInternalInterface::get_field_avg_size(field_id);
field_bytes = num_rows * avg_size;
break;
}
case DataType::JSON: {
auto avg_size =
SegmentInternalInterface::get_field_avg_size(field_id);
field_bytes = num_rows * avg_size;
break;
}
case DataType::ARRAY: {
auto avg_size =
SegmentInternalInterface::get_field_avg_size(field_id);
field_bytes = num_rows * avg_size;
break;
}
default:
break;
}
// Scalar fields: memory or disk based on mmap setting
if (growing_mmap_enabled) {
disk_bytes += field_bytes;
} else {
memory_bytes += field_bytes;
}
}
}
// 4. Text index (Tantivy)
{
std::shared_lock lock(mutex_);
for (const auto& [field_id, index_variant] : text_indexes_) {
if (auto* ptr = std::get_if<std::unique_ptr<index::TextMatchIndex>>(
&index_variant)) {
memory_bytes += (*ptr)->ByteSize();
}
}
}
// 5. Deleted records overhead
memory_bytes += deleted_record_.mem_size();
// Apply safety margin
constexpr double kResourceSafetyMargin = 1.2;
memory_bytes = static_cast<int64_t>(memory_bytes * kResourceSafetyMargin);
disk_bytes = static_cast<int64_t>(disk_bytes * kResourceSafetyMargin);
return ResourceUsage{memory_bytes, disk_bytes};
}
void
SegmentGrowingImpl::UpdateResourceTracking() {
auto new_resource = EstimateSegmentResourceUsage();
// Lock to ensure refund-then-charge is atomic
std::lock_guard<std::mutex> lock(resource_tracking_mutex_);
auto old_resource = tracked_resource_;
if (old_resource.AnyGTZero()) {
Manager::GetInstance().RefundLoadedResource(
old_resource, fmt::format("growing_segment_{}_refund", id_));
}
if (new_resource.AnyGTZero()) {
Manager::GetInstance().ChargeLoadedResource(
new_resource, fmt::format("growing_segment_{}_charge", id_));
}
tracked_resource_ = new_resource;
}
void
SegmentGrowingImpl::Insert(int64_t reserved_offset,
int64_t num_rows,
@ -263,13 +470,12 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
// step 1: check insert data if valid
std::unordered_map<FieldId, int64_t> field_id_to_offset;
int64_t field_offset = 0;
int64_t exist_rows = stats_.mem_size / (sizeof(Timestamp) + sizeof(idx_t));
for (const auto& field : insert_record_proto->fields_data()) {
auto field_id = FieldId(field.field_id());
AssertInfo(!field_id_to_offset.count(field_id), "duplicate field data");
field_id_to_offset.emplace(field_id, field_offset++);
AssertInfo(exist_rows == 0 || insert_record_.is_data_exist(field_id),
AssertInfo(insert_record_.is_data_exist(field_id),
"unexpected new field in growing segment {}, field id {}",
id_,
field.field_id());
@ -299,12 +505,11 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
// step 2: sort timestamp
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
// step 3: fill into Segment.ConcurrentVector
// step 3: fill into Segment.ConcurrentVector, no mmap_descriptor is used for timestamps
insert_record_.timestamps_.set_data_raw(
reserved_offset, timestamps_raw, num_rows);
stats_.mem_size += num_rows * sizeof(Timestamp);
// update the mem size of timestamps and row IDs
stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t));
for (auto& [field_id, field_meta] : schema_->get_fields()) {
if (field_id.get() < START_USER_FIELDID) {
continue;
@ -325,6 +530,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
&insert_record_proto->fields_data(data_offset),
field_meta);
}
//insert vector data into index
if (segcore_config_.get_enable_interim_segment_index()) {
indexing_record_.AppendingIndex(
@ -413,7 +619,10 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
insert_record_.insert_pk(pks[i], reserved_offset + i);
}
// step 5: update small indexes
// step 5: update the resource usage
UpdateResourceTracking();
// step 6: update small indexes
insert_record_.ack_responder_.AddSegment(reserved_offset,
reserved_offset + num_rows);
}
@ -755,7 +964,6 @@ SegmentGrowingImpl::Delete(int64_t size,
}
// step 1: sort timestamp
std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);
@ -768,6 +976,10 @@ SegmentGrowingImpl::Delete(int64_t size,
// step 2: fill delete record
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
// step 3: update resource tracking
UpdateResourceTracking();
return SegcoreError::success();
}
@ -1678,6 +1890,8 @@ SegmentGrowingImpl::Reopen(SchemaPtr sch) {
schema_ = sch;
}
UpdateResourceTracking();
}
void
@ -1754,6 +1968,9 @@ SegmentGrowingImpl::FinishLoad() {
fill_empty_field(field_meta);
}
}
// Update resource tracking
UpdateResourceTracking();
}
void
@ -1921,7 +2138,7 @@ SegmentGrowingImpl::fill_empty_field(const FieldMeta& field_meta) {
field_id, field_meta, size_per_chunk(), mmap_descriptor_);
}
auto total_row_num = insert_record_.size();
auto total_row_num = insert_record_.row_count();
auto data = bulk_subscript_not_exist_field(field_meta, total_row_num);
insert_record_.get_valid_data(field_id)->set_data_raw(

View File

@ -13,6 +13,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <tbb/concurrent_priority_queue.h>
@ -22,6 +23,7 @@
#include <utility>
#include "cachinglayer/CacheSlot.h"
#include "cachinglayer/Manager.h"
#include "AckResponder.h"
#include "ConcurrentVector.h"
#include "DeletedRecord.h"
@ -354,6 +356,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
segment_id) {
this->CreateTextIndexes();
this->InitializeArrayOffsets();
this->UpdateResourceTracking();
}
~SegmentGrowingImpl() {
@ -373,6 +376,14 @@ class SegmentGrowingImpl : public SegmentGrowing {
storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->UnRegister(mmap_descriptor_);
}
// Refund any tracked resources before destruction
// No lock needed - destructor implies exclusive access
if (tracked_resource_.AnyGTZero()) {
Manager::GetInstance().RefundLoadedResource(
tracked_resource_,
fmt::format("growing_segment_{}_destructor", id_));
}
}
void
@ -515,6 +526,22 @@ class SegmentGrowingImpl : public SegmentGrowing {
const int64_t* seg_offsets,
int64_t count) const;
/**
* @brief Estimate the current total resource usage of the growing segment
*
* This includes memory/disk usage for:
* - Field data (raw vectors and scalars)
* - Timestamps
* - PK-to-offset index
* - Interim vector indexes (if enabled)
* - Text match indexes (if enabled)
* - Deleted records
*
* @return ResourceUsage containing memory_bytes and file_bytes estimates
*/
ResourceUsage
EstimateSegmentResourceUsage() const;
protected:
int64_t
num_chunk(FieldId field_id) const override;
@ -572,6 +599,20 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
fill_empty_field(const FieldMeta& field_meta);
/**
* @brief Update resource tracking by refunding old estimate and charging new
*
* This method:
* 1. Estimates current total resource usage of the growing segment
* 2. Refunds the previously tracked resource from the cache manager
* 3. Charges the new resource usage to the cache manager
* 4. Updates the tracked resource checkpoint
*
* Should be called after data modifications (Insert, Delete, etc.)
*/
void
UpdateResourceTracking();
private:
void
AddTexts(FieldId field_id,
@ -646,6 +687,12 @@ class SegmentGrowingImpl : public SegmentGrowing {
// Representative field_id for each struct (used to extract array lengths during Insert)
// One field_id per struct, since all fields in the same struct have identical array lengths
std::unordered_set<FieldId> struct_representative_fields_;
// Tracked resource usage for refund-then-charge pattern
// This stores the last estimated resource usage that was charged to the cache manager
ResourceUsage tracked_resource_{};
// Mutex to protect tracked_resource_ updates (refund-then-charge must be atomic)
mutable std::mutex resource_tracking_mutex_;
};
inline SegmentGrowingPtr

View File

@ -12,6 +12,9 @@
#include <gtest/gtest.h>
#include <numeric>
#include <thread>
#include <vector>
#include "common/Types.h"
#include "common/IndexMeta.h"
#include "knowhere/comp/index_param.h"
@ -1222,3 +1225,224 @@ TEST(Growing, TestMaskWithNullableTTLField) {
EXPECT_EQ(expired_count, test_data_count / 4);
}
// Resource tracking tests for growing segments
TEST(Growing, EmptySegmentResourceEstimation) {
auto schema = std::make_shared<Schema>();
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
// Empty segment should have zero resource usage
auto resource = segment_impl->EstimateSegmentResourceUsage();
EXPECT_EQ(resource.memory_bytes, 0);
EXPECT_EQ(resource.file_bytes, 0);
}
TEST(Growing, ResourceEstimationAfterInsert) {
auto schema = std::make_shared<Schema>();
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
// Insert some data
const int64_t N = 1000;
auto dataset = DataGen(schema, N);
segment->PreInsert(N);
segment->Insert(0,
N,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
// After insert, resource usage should be positive
auto resource = segment_impl->EstimateSegmentResourceUsage();
EXPECT_GT(resource.memory_bytes, 0);
// Memory should include at least:
// - Vector data: N * dim * sizeof(float) = 1000 * 128 * 4 = 512000 bytes
// - Timestamps: N * sizeof(Timestamp) = 1000 * 8 = 8000 bytes
// - PK field: N * sizeof(int64_t) = 1000 * 8 = 8000 bytes
// Plus safety margin of 1.2x
int64_t expected_min_size =
N * dim * sizeof(float) + N * sizeof(Timestamp) + N * sizeof(int64_t);
EXPECT_GE(resource.memory_bytes, expected_min_size);
}
TEST(Growing, ResourceIncrementsWithMoreInserts) {
auto schema = std::make_shared<Schema>();
auto dim = 128;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
// First insert
const int64_t N1 = 500;
auto dataset1 = DataGen(schema, N1, 42, 0);
segment->PreInsert(N1);
segment->Insert(0,
N1,
dataset1.row_ids_.data(),
dataset1.timestamps_.data(),
dataset1.raw_);
auto resource1 = segment_impl->EstimateSegmentResourceUsage();
// Second insert
const int64_t N2 = 500;
auto dataset2 = DataGen(schema, N2, 43, N1);
segment->PreInsert(N2);
segment->Insert(N1,
N2,
dataset2.row_ids_.data(),
dataset2.timestamps_.data(),
dataset2.raw_);
auto resource2 = segment_impl->EstimateSegmentResourceUsage();
// Resource should increase after second insert
EXPECT_GT(resource2.memory_bytes, resource1.memory_bytes);
}
TEST(Growing, ResourceTrackingAfterDelete) {
auto schema = std::make_shared<Schema>();
auto dim = 64;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
// Insert data first
const int64_t N = 100;
auto dataset = DataGen(schema, N);
segment->PreInsert(N);
segment->Insert(0,
N,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
auto resource_before_delete = segment_impl->EstimateSegmentResourceUsage();
EXPECT_GT(resource_before_delete.memory_bytes, 0);
// Delete some rows
auto pks = dataset.get_col<int64_t>(pk_fid);
auto del_pks = GenPKs(pks.begin(), pks.begin() + 5);
auto del_tss = GenTss(5, N);
auto status = segment->Delete(5, del_pks.get(), del_tss.data());
EXPECT_TRUE(status.ok());
// Resource estimation should still work after delete
auto resource_after_delete = segment_impl->EstimateSegmentResourceUsage();
EXPECT_GT(resource_after_delete.memory_bytes, 0);
}
TEST(Growing, ConcurrentInsertResourceTracking) {
auto schema = std::make_shared<Schema>();
auto dim = 32;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
const int num_threads = 4;
const int64_t rows_per_thread = 100;
std::vector<std::thread> threads;
// Reserve space for all rows upfront
int64_t total_rows = num_threads * rows_per_thread;
segment->PreInsert(total_rows);
// Concurrent inserts from multiple threads
for (int t = 0; t < num_threads; ++t) {
threads.emplace_back([&, t]() {
auto dataset =
DataGen(schema, rows_per_thread, 42 + t, t * rows_per_thread);
segment->Insert(t * rows_per_thread,
rows_per_thread,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
});
}
for (auto& thread : threads) {
thread.join();
}
// Verify total row count
EXPECT_EQ(segment->get_row_count(), total_rows);
// Verify resource estimation is consistent and positive
auto resource = segment_impl->EstimateSegmentResourceUsage();
EXPECT_GT(resource.memory_bytes, 0);
}
TEST(Growing, MultipleFieldsResourceEstimation) {
// Create schema with multiple fields
auto schema = std::make_shared<Schema>();
auto dim = 64;
auto metric_type = knowhere::metric::L2;
auto vec_fid =
schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type);
auto pk_fid = schema->AddDebugField("pk", DataType::INT64);
auto float_fid = schema->AddDebugField("age", DataType::FLOAT);
auto double_fid = schema->AddDebugField("score", DataType::DOUBLE);
schema->set_primary_field_id(pk_fid);
auto segment = CreateGrowingSegment(schema, empty_index_meta);
auto* segment_impl = dynamic_cast<SegmentGrowingImpl*>(segment.get());
ASSERT_NE(segment_impl, nullptr);
// Insert data
const int64_t N = 500;
auto dataset = DataGen(schema, N);
segment->PreInsert(N);
segment->Insert(0,
N,
dataset.row_ids_.data(),
dataset.timestamps_.data(),
dataset.raw_);
auto resource = segment_impl->EstimateSegmentResourceUsage();
// Memory should include all fields:
// - Vector: N * dim * sizeof(float) = 500 * 64 * 4 = 128000 bytes
// - pk (int64): N * 8 = 4000 bytes
// - age (float): N * 4 = 2000 bytes
// - score (double): N * 8 = 4000 bytes
// - Timestamps: N * 8 = 4000 bytes
// Plus safety margin
int64_t min_expected = N * dim * sizeof(float) + N * sizeof(int64_t) +
N * sizeof(float) + N * sizeof(double) +
N * sizeof(Timestamp);
EXPECT_GE(resource.memory_bytes, min_expected);
}

View File

@ -46,7 +46,7 @@ class TimestampIndex {
std::pair<int64_t, int64_t> active_range);
size_t
size() const {
memory_size() const {
return sizeof(*this) + lengths_.size() * sizeof(int64_t) +
start_locs_.size() * sizeof(int64_t) +
timestamp_barriers_.size() * sizeof(Timestamp);

View File

@ -201,6 +201,20 @@ ReleaseLoadingResource(CResourceUsage size) {
size.disk_bytes));
}
void
ChargeLoadedResource(CResourceUsage size) {
milvus::cachinglayer::Manager::GetInstance().ChargeLoadedResource(
milvus::cachinglayer::ResourceUsage(size.memory_bytes,
size.disk_bytes));
}
void
RefundLoadedResource(CResourceUsage size) {
milvus::cachinglayer::Manager::GetInstance().RefundLoadedResource(
milvus::cachinglayer::ResourceUsage(size.memory_bytes,
size.disk_bytes));
}
CStatus
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();

View File

@ -48,6 +48,12 @@ TryReserveLoadingResourceWithTimeout(CResourceUsage size,
void
ReleaseLoadingResource(CResourceUsage size);
void
ChargeLoadedResource(CResourceUsage size);
void
RefundLoadedResource(CResourceUsage size);
CStatus
AppendIndexInfo(CLoadIndexInfo c_load_index_info,
int64_t index_id,

View File

@ -99,6 +99,13 @@ SegcoreSetDenseVectorInterminIndexRefineWithQuantFlag(const bool value) {
config.set_refine_with_quant_flag(value);
}
extern "C" void
SegcoreSetInterimIndexMemExpansionRate(const float value) {
milvus::segcore::SegcoreConfig& config =
milvus::segcore::SegcoreConfig::default_config();
config.set_interim_index_mem_expansion_rate(value);
}
extern "C" void
SegcoreSetSubDim(const int64_t value) {
milvus::segcore::SegcoreConfig& config =

View File

@ -60,6 +60,9 @@ SegcoreSetDenseVectorInterminIndexRefineQuantType(const char*);
void
SegcoreSetDenseVectorInterminIndexRefineWithQuantFlag(const bool);
void
SegcoreSetInterimIndexMemExpansionRate(const float);
// return value must be freed by the caller
char*
SegcoreSetSimdType(const char*);

View File

@ -37,7 +37,7 @@ detect_aarch64_target_arch()
set(JEMALLOC_PREFIX "${CMAKE_INSTALL_PREFIX}")
set(JEMALLOC_LIB_DIR "${JEMALLOC_PREFIX}/lib")
set(JEMALLOC_STATIC_LIB "${JEMALLOC_LIB_DIR}/libjemalloc_pic${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(JEMALLOC_CONFIGURE_COMMAND ./configure "AR=${CMAKE_AR}" "CC=${CMAKE_C_COMPILER}")
set(JEMALLOC_CONFIGURE_COMMAND ./configure "AR=${CMAKE_AR}" "CC=${CMAKE_C_COMPILER}" "CFLAGS=-fno-omit-frame-pointer" "CXXFLAGS=-fno-omit-frame-pointer")
message("CMAKE_OSX_SYSROOT: ${CMAKE_OSX_SYSROOT}")
if (CMAKE_OSX_SYSROOT)

View File

@ -1146,6 +1146,9 @@ func (sd *shardDelegator) Close() {
sd.tsCond.Broadcast()
sd.lifetime.Wait()
// Remove all candidates and refund bloom filter resources
sd.pkOracle.RemoveAndRefundAll()
// clean idf oracle
if sd.idfOracle != nil {
sd.idfOracle.Close()

View File

@ -863,17 +863,20 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
sd.AddExcludedSegments(droppedInfos)
if len(sealed) > 0 {
sd.pkOracle.Remove(
removed := sd.pkOracle.Remove(
pkoracle.WithSegmentIDs(lo.Map(sealed, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
pkoracle.WithSegmentType(commonpb.SegmentState_Sealed),
pkoracle.WithWorkerID(targetNodeID),
)
// Refund resources for removed sealed segment candidates
sd.pkOracle.RefundRemoved(removed)
}
if len(growing) > 0 {
sd.pkOracle.Remove(
pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...),
pkoracle.WithSegmentType(commonpb.SegmentState_Growing),
)
// leave the bloom filter sets in growing segment be closed by Release()
}
var releaseErr error

View File

@ -519,6 +519,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
s.True(s.delegator.distribution.Serviceable())
s.delegator.Close()
// After Close(), pkOracle is cleared, so ProcessDelete becomes a no-op.
// This is expected behavior - the delegator is being decommissioned.
s.delegator.ProcessDelete([]*DeleteData{
{
PartitionID: 500,
@ -528,7 +530,6 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
},
}, 10)
s.Require().NoError(err)
s.False(s.delegator.distribution.Serviceable())
}
func (s *DelegatorDataSuite) TestLoadGrowingWithBM25() {

View File

@ -16,6 +16,13 @@
package pkoracle
/*
#cgo pkg-config: milvus_core
#include "segcore/load_index_c.h"
*/
import "C"
import (
"sync"
@ -40,6 +47,10 @@ type BloomFilterSet struct {
segType commonpb.SegmentState
currentStat *storage.PkStatistics
historyStats []*storage.PkStatistics
// Resource tracking
trackedSize int64 // memory size that was charged
resourceCharged bool // tracks whether memory resources were charged for this bloom filter set
}
// MayPkExist returns whether any bloom filters returns positive.
@ -140,6 +151,103 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
s.historyStats = append(s.historyStats, stats)
}
// MemSize returns the total memory size of all bloom filters in bytes.
// This includes both currentStat and all historyStats.
func (s *BloomFilterSet) MemSize() int64 {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
var size int64
if s.currentStat != nil && s.currentStat.PkFilter != nil {
size += int64(s.currentStat.PkFilter.Cap() / 8) // Cap returns bits, convert to bytes
}
for _, stats := range s.historyStats {
if stats != nil && stats.PkFilter != nil {
size += int64(stats.PkFilter.Cap() / 8)
}
}
return size
}
// Charge charges memory resource for this bloom filter set via caching layer.
// Safe to call multiple times - only charges once.
func (s *BloomFilterSet) Charge() {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
if s.resourceCharged {
return // Already charged
}
size := s.memSizeLocked()
if size > 0 {
C.ChargeLoadedResource(C.CResourceUsage{
memory_bytes: C.int64_t(size),
disk_bytes: 0,
})
s.trackedSize = size
s.resourceCharged = true
log.Debug("charged bloom filter resource",
zap.Int64("segmentID", s.segmentID),
zap.Int64("size", size))
}
}
// Refund refunds any charged resources. Safe to call multiple times.
func (s *BloomFilterSet) Refund() {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
if !s.resourceCharged || s.trackedSize <= 0 {
return
}
C.RefundLoadedResource(C.CResourceUsage{
memory_bytes: C.int64_t(s.trackedSize),
disk_bytes: 0,
})
log.Debug("refunded bloom filter resource",
zap.Int64("segmentID", s.segmentID),
zap.Int64("size", s.trackedSize))
s.trackedSize = 0
s.resourceCharged = false
}
// IsResourceCharged returns whether memory resources have been charged for this bloom filter set.
func (s *BloomFilterSet) IsResourceCharged() bool {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
return s.resourceCharged
}
// SetResourceCharged sets the resourceCharged flag and trackedSize for testing purposes.
// This allows tests to simulate charged state without calling the actual C code.
func (s *BloomFilterSet) SetResourceCharged(charged bool) {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.resourceCharged = charged
if charged {
s.trackedSize = s.memSizeLocked()
} else {
s.trackedSize = 0
}
}
// memSizeLocked returns the total memory size without acquiring the lock.
// Caller must hold statsMutex.
func (s *BloomFilterSet) memSizeLocked() int64 {
var size int64
if s.currentStat != nil && s.currentStat.PkFilter != nil {
size += int64(s.currentStat.PkFilter.Cap() / 8)
}
for _, stats := range s.historyStats {
if stats != nil && stats.PkFilter != nil {
size += int64(stats.PkFilter.Cap() / 8)
}
}
return size
}
// NewBloomFilterSet returns a new BloomFilterSet.
func NewBloomFilterSet(segmentID int64, paritionID int64, segType commonpb.SegmentState) *BloomFilterSet {
bfs := &BloomFilterSet{

View File

@ -100,3 +100,91 @@ func TestHistoricalStat(t *testing.T) {
assert.True(t, ret[i])
}
}
func TestMemSize(t *testing.T) {
paramtable.Init()
t.Run("empty bloom filter set", func(t *testing.T) {
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
size := bfs.MemSize()
assert.Equal(t, int64(0), size)
})
t.Run("with current stat only", func(t *testing.T) {
batchSize := 100
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
size := bfs.MemSize()
assert.Greater(t, size, int64(0))
// Bloom filter size should be related to the configured BloomFilterSize
// Cap() returns bits, MemSize() converts to bytes
expectedSize := int64(bfs.currentStat.PkFilter.Cap() / 8)
assert.Equal(t, expectedSize, size)
})
t.Run("with historical stats", func(t *testing.T) {
batchSize := 100
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
currentSize := bfs.MemSize()
// Add historical stats (simulate compaction history)
bfs.AddHistoricalStats(bfs.currentStat)
bfs.AddHistoricalStats(bfs.currentStat)
totalSize := bfs.MemSize()
// Should be approximately 3x the single bloom filter size
// (1 current + 2 historical, all pointing to same underlying filter for this test)
assert.Equal(t, currentSize*3, totalSize)
})
t.Run("with nil current stat and historical stats", func(t *testing.T) {
batchSize := 100
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
singleSize := bfs.MemSize()
// Add historical stats and clear current
bfs.AddHistoricalStats(bfs.currentStat)
bfs.currentStat = nil
historicalSize := bfs.MemSize()
assert.Equal(t, singleSize, historicalSize)
})
t.Run("varchar primary key", func(t *testing.T) {
batchSize := 100
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
size := bfs.MemSize()
assert.Greater(t, size, int64(0))
})
}

View File

@ -31,10 +31,17 @@ type PkOracle interface {
BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) map[int64][]bool
// RegisterCandidate adds candidate into pkOracle.
Register(candidate Candidate, workerID int64) error
// RemoveCandidate removes candidate
Remove(filters ...CandidateFilter) error
// RemoveCandidate removes candidate and returns the removed candidates.
Remove(filters ...CandidateFilter) []Candidate
// CheckCandidate checks whether candidate with provided key exists.
Exists(candidate Candidate, workerID int64) bool
// Range iterates over all candidates without removing them.
Range(fn func(candidate Candidate) bool)
// RefundRemoved refunds resources for BloomFilterSet candidates.
RefundRemoved(candidates []Candidate)
// RemoveAndRefundAll removes all candidates and refunds resources for BloomFilterSet candidates.
// Used during shutdown to clean up and refund resources.
RemoveAndRefundAll()
}
var _ PkOracle = (*pkOracle)(nil)
@ -97,19 +104,22 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error {
return nil
}
// Remove removes candidate from pko.
func (pko *pkOracle) Remove(filters ...CandidateFilter) error {
// Remove removes candidate from pko and returns the removed candidates.
func (pko *pkOracle) Remove(filters ...CandidateFilter) []Candidate {
var removed []Candidate
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters {
if !filter(candidate) {
return true
}
}
pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID))
if _, ok := pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)); ok {
removed = append(removed, candidate.Candidate)
}
return true
})
return nil
return removed
}
func (pko *pkOracle) Exists(candidate Candidate, workerID int64) bool {
@ -117,6 +127,34 @@ func (pko *pkOracle) Exists(candidate Candidate, workerID int64) bool {
return ok
}
// Range iterates over all candidates without removing them.
func (pko *pkOracle) Range(fn func(candidate Candidate) bool) {
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
return fn(candidate.Candidate)
})
}
// RefundRemoved refunds resources for BloomFilterSet candidates.
// For candidates that are not BloomFilterSet (e.g., LocalSegment), this is a no-op.
func (pko *pkOracle) RefundRemoved(candidates []Candidate) {
for _, candidate := range candidates {
if bfs, ok := candidate.(*BloomFilterSet); ok {
bfs.Refund()
}
}
}
// RemoveAndRefundAll removes all candidates and refunds resources for BloomFilterSet candidates.
// Used during shutdown to clean up and refund resources.
func (pko *pkOracle) RemoveAndRefundAll() {
removed := pko.Remove()
for _, candidate := range removed {
if bfs, ok := candidate.(*BloomFilterSet); ok {
bfs.Refund()
}
}
}
// NewPkOracle returns pkOracle as PkOracle interface.
func NewPkOracle() PkOracle {
return &pkOracle{

View File

@ -63,3 +63,223 @@ func TestGet(t *testing.T) {
assert.NotContains(t, segmentIDs, int64(1))
}
}
func TestRemoveReturnsRemovedCandidates(t *testing.T) {
paramtable.Init()
t.Run("remove single candidate", func(t *testing.T) {
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
pko.Register(bfs, 1)
removed := pko.Remove(WithSegmentIDs(1))
assert.Len(t, removed, 1)
assert.Equal(t, int64(1), removed[0].ID())
})
t.Run("remove multiple candidates", func(t *testing.T) {
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Register multiple candidates
bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs1.UpdateBloomFilter(pks)
pko.Register(bfs1, 1)
bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed)
bfs2.UpdateBloomFilter(pks)
pko.Register(bfs2, 1)
bfs3 := NewBloomFilterSet(3, 1, commonpb.SegmentState_Sealed)
bfs3.UpdateBloomFilter(pks)
pko.Register(bfs3, 1)
// Remove multiple segments
removed := pko.Remove(WithSegmentIDs(1, 2))
assert.Len(t, removed, 2)
removedIDs := make([]int64, len(removed))
for i, c := range removed {
removedIDs[i] = c.ID()
}
assert.Contains(t, removedIDs, int64(1))
assert.Contains(t, removedIDs, int64(2))
// Verify bfs3 still exists
assert.True(t, pko.Exists(bfs3, 1))
})
t.Run("remove non-existent candidate", func(t *testing.T) {
pko := NewPkOracle()
removed := pko.Remove(WithSegmentIDs(999))
assert.Len(t, removed, 0)
})
t.Run("remove with segment type filter", func(t *testing.T) {
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Register sealed segment
bfsSealed := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfsSealed.UpdateBloomFilter(pks)
pko.Register(bfsSealed, 1)
// Register growing segment
bfsGrowing := NewBloomFilterSet(2, 1, commonpb.SegmentState_Growing)
bfsGrowing.UpdateBloomFilter(pks)
pko.Register(bfsGrowing, 1)
// Remove only sealed segments
removed := pko.Remove(WithSegmentType(commonpb.SegmentState_Sealed))
assert.Len(t, removed, 1)
assert.Equal(t, int64(1), removed[0].ID())
// Growing segment should still exist
assert.True(t, pko.Exists(bfsGrowing, 1))
})
t.Run("remove with worker ID filter", func(t *testing.T) {
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Register on different workers
bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs1.UpdateBloomFilter(pks)
pko.Register(bfs1, 1)
bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed)
bfs2.UpdateBloomFilter(pks)
pko.Register(bfs2, 2)
// Remove only from worker 1
removed := pko.Remove(WithWorkerID(1))
assert.Len(t, removed, 1)
assert.Equal(t, int64(1), removed[0].ID())
// bfs2 on worker 2 should still exist
assert.True(t, pko.Exists(bfs2, 2))
})
t.Run("remove all candidates", func(t *testing.T) {
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Register multiple candidates
for i := 0; i < 5; i++ {
bfs := NewBloomFilterSet(int64(i), 1, commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(pks)
pko.Register(bfs, 1)
}
// Remove all (no filter)
removed := pko.Remove()
assert.Len(t, removed, 5)
})
}
func TestRefundRemoved(t *testing.T) {
paramtable.Init()
t.Run("refund empty candidates", func(t *testing.T) {
pko := NewPkOracle()
// Should not panic with empty slice
pko.RefundRemoved(nil)
pko.RefundRemoved([]Candidate{})
})
t.Run("refund bloom filter candidates", func(t *testing.T) {
// Test that RefundRemoved correctly identifies BloomFilterSet and calls Refund
// Note: We don't set resourceCharged=true to avoid requiring C library initialization
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Create bloom filter sets (not charged, so Refund() is a no-op)
bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs1.UpdateBloomFilter(pks)
bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed)
bfs2.UpdateBloomFilter(pks)
candidates := []Candidate{bfs1, bfs2}
// Should not panic - Refund() returns early when not charged
pko.RefundRemoved(candidates)
})
}
func TestRemoveAndRefundAll(t *testing.T) {
paramtable.Init()
t.Run("remove and refund all with empty oracle", func(t *testing.T) {
pko := NewPkOracle()
// Should not panic
pko.RemoveAndRefundAll()
})
t.Run("remove and refund all bloom filter candidates", func(t *testing.T) {
// Test that RemoveAndRefundAll removes and refunds all BloomFilterSet candidates
// Note: We don't set resourceCharged=true to avoid requiring C library initialization
pko := NewPkOracle()
batchSize := 10
pks := make([]storage.PrimaryKey, 0)
for i := 0; i < batchSize; i++ {
pk := storage.NewInt64PrimaryKey(int64(i))
pks = append(pks, pk)
}
// Create and register bloom filter sets (not charged)
bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
bfs1.UpdateBloomFilter(pks)
pko.Register(bfs1, 1)
bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed)
bfs2.UpdateBloomFilter(pks)
pko.Register(bfs2, 1)
// Should not panic - Refund() returns early when not charged
pko.RemoveAndRefundAll()
// Candidates should be removed from oracle
assert.False(t, pko.Exists(bfs1, 1))
assert.False(t, pko.Exists(bfs2, 1))
})
}

View File

@ -499,6 +499,20 @@ func (s *LocalSegment) LastDeltaTimestamp() uint64 {
return s.lastDeltaTimestamp.Load()
}
// UpdateBloomFilter updates bloom filter with provided pks and charges resource if BF is newly created.
// This overrides baseSegment.UpdateBloomFilter to handle resource charging for growing segments.
func (s *LocalSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
if s.skipGrowingBF {
return
}
// Update bloom filter (may create new BF if not exist)
s.bloomFilterSet.UpdateBloomFilter(pks)
// Charge bloom filter resource (safe to call multiple times - only charges once)
s.bloomFilterSet.Charge()
}
func (s *LocalSegment) GetIndexByID(indexID int64) *IndexedFieldInfo {
info, _ := s.fieldIndexes.Get(indexID)
return info
@ -1451,6 +1465,9 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
// usage := s.ResourceUsageEstimate()
// s.manager.SubLogicalResource(usage)
// Refund bloom filter resource
s.bloomFilterSet.Refund()
binlogSize := s.binlogSize.Load()
if binlogSize > 0 {
// no concurrent change to s.binlogSize, so the subtraction is safe

View File

@ -380,12 +380,14 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
if !segment.BloomFilterExist() {
log.Debug("BloomFilterExist", zap.Int64("segid", segment.ID()))
log.Debug("loading bloom filter for segment", zap.Int64("segmentID", segment.ID()))
bfs, err := loader.loadSingleBloomFilterSet(ctx, loadInfo.GetCollectionID(), loadInfo, segment.Type())
if err != nil {
return errors.Wrap(err, "At LoadBloomFilter")
}
segment.SetBloomFilter(bfs)
// Charge bloom filter resource
bfs.Charge()
}
if segment.Level() != datapb.SegmentLevel_L0 {
@ -708,11 +710,44 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
return nil, err
}
pkField := GetPkField(collection.Schema())
pkFieldID := pkField.GetFieldID()
// Calculate total memory size needed for bloom filters (PK stats)
var totalMemorySize int64
for _, info := range infos {
for _, fieldBinlog := range info.Statslogs {
if fieldBinlog.FieldID == pkFieldID {
totalMemorySize += getBinlogDataMemorySize(fieldBinlog)
}
}
}
// Reserve memory resource if tiered eviction is enabled
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && totalMemorySize > 0 {
if ok := C.TryReserveLoadingResourceWithTimeout(C.CResourceUsage{
// double loading memory size for bloom filters to avoid OOM during loading
memory_bytes: C.int64_t(totalMemorySize * 2),
disk_bytes: C.int64_t(0),
}, 1000); !ok {
return nil, fmt.Errorf("failed to reserve loading resource for bloom filters, totalMemorySize = %v MB",
logutil.ToMB(float64(totalMemorySize)))
}
log.Info("reserved loading resource for bloom filters", zap.Float64("totalMemorySizeMB", logutil.ToMB(float64(totalMemorySize))))
}
defer func() {
if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && totalMemorySize > 0 {
C.ReleaseLoadingResource(C.CResourceUsage{
memory_bytes: C.int64_t(totalMemorySize * 2),
disk_bytes: C.int64_t(0),
})
log.Info("released loading resource for bloom filters", zap.Float64("totalMemorySizeMB", logutil.ToMB(float64(totalMemorySize))))
}
}()
log.Info("start loading remote...", zap.Int("segmentNum", segmentNum))
loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]()
// TODO check memory for bf size
loadRemoteFunc := func(idx int) error {
loadInfo := infos[idx]
partitionID := loadInfo.PartitionID
@ -742,7 +777,14 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
return nil, err
}
return loadedBfs.Collect(), nil
result := loadedBfs.Collect()
// Charge loaded resource for bloom filters
for _, bfs := range result {
bfs.Charge()
}
return result, nil
}
func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*IndexedFieldInfo, []*datapb.FieldBinlog) {

View File

@ -576,6 +576,9 @@ func InitInterminIndexConfig(params *paramtable.ComponentParam) error {
enableInterminIndex := C.bool(params.QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool())
C.SegcoreSetEnableInterminSegmentIndex(enableInterminIndex)
memExpansionRate := C.float(params.QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat())
C.SegcoreSetInterimIndexMemExpansionRate(memExpansionRate)
nlist := C.int64_t(params.QueryNodeCfg.InterimIndexNlist.GetAsInt64())
C.SegcoreSetNlist(nlist)