diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 9c0e6e864a..9d145fa815 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -144,7 +144,7 @@ class DeletedRecord { deleted_mask_.set(row_id); } else { // need to add mask size firstly for growing segment - deleted_mask_.resize(insert_record_->size()); + deleted_mask_.resize(insert_record_->row_count()); deleted_mask_.set(row_id); } removed_num++; @@ -170,6 +170,13 @@ class DeletedRecord { } estimated_memory_size_ = new_estimated_size; } + } else { + // The resource usage of DeletedRecord for a Growing Segment is already counted in + // SegmentGrowingImpl::EstimateSegmentResourceUsage(), so there is no need to count it here. + // The reason we don't count it here is that we treat the Growing Segment as a single unit, + // we do not track memory separately for each field. + // If you intend to add this tracking here, first consider how to count the memory usage separately + // within the growing segment. } return max_timestamp; @@ -245,7 +252,7 @@ class DeletedRecord { if constexpr (is_sealed) { bitsize = sealed_row_count_; } else { - bitsize = insert_record_->size(); + bitsize = insert_record_->row_count(); } BitsetType bitmap(bitsize, false); diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index b02a977ecd..3fda724a5a 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -75,7 +75,7 @@ class OffsetMap { clear() = 0; virtual size_t - size() const = 0; + memory_size() const = 0; }; template @@ -203,7 +203,7 @@ class OffsetOrderedMap : public OffsetMap { } size_t - size() const override { + memory_size() const override { std::shared_lock lck(mtx_); return map_.get_allocator().total_allocated(); } @@ -383,7 +383,7 @@ class OffsetOrderedArray : public OffsetMap { } size_t - size() const override { + memory_size() const override { return sizeof(std::pair) * array_.capacity(); } @@ -592,8 +592,8 @@ class InsertRecordSealed { pk2offset_->seal(); // update estimated memory size to caching layer cachinglayer::Manager::GetInstance().ChargeLoadedResource( - {static_cast(pk2offset_->size()), 0}); - estimated_memory_size_ += pk2offset_->size(); + {static_cast(pk2offset_->memory_size()), 0}); + estimated_memory_size_ += pk2offset_->memory_size(); } void @@ -604,11 +604,11 @@ class InsertRecordSealed { timestamp_index_ = std::move(timestamp_index); AssertInfo(timestamps_.num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); - size_t size = - timestamps.size() * sizeof(Timestamp) + timestamp_index_.size(); + size_t memory_size = timestamps.size() * sizeof(Timestamp) + + timestamp_index_.memory_size(); cachinglayer::Manager::GetInstance().ChargeLoadedResource( - {static_cast(size), 0}); - estimated_memory_size_ += size; + {static_cast(memory_size), 0}); + estimated_memory_size_ += memory_size; } const ConcurrentVector& @@ -1076,7 +1076,7 @@ class InsertRecordGrowing { } int64_t - size() const { + row_count() const { return ack_responder_.GetAck(); } diff --git a/internal/core/src/segcore/Record.h b/internal/core/src/segcore/Record.h index ae659152b1..971ea7b673 100644 --- a/internal/core/src/segcore/Record.h +++ b/internal/core/src/segcore/Record.h @@ -20,7 +20,7 @@ inline int64_t get_barrier(const RecordType& record, Timestamp timestamp) { auto& vec = record.timestamps(); int64_t beg = 0; - int64_t end = record.size(); + int64_t end = record.row_count(); while (beg < end) { auto mid = (beg + end) / 2; if (vec[mid] <= timestamp) { diff --git a/internal/core/src/segcore/SegcoreConfig.h b/internal/core/src/segcore/SegcoreConfig.h index c31913590f..1eca70035f 100644 --- a/internal/core/src/segcore/SegcoreConfig.h +++ b/internal/core/src/segcore/SegcoreConfig.h @@ -157,6 +157,16 @@ class SegcoreConfig { return enable_geometry_cache_; } + void + set_interim_index_mem_expansion_rate(float rate) { + interim_index_mem_expansion_rate_ = rate; + } + + float + get_interim_index_mem_expansion_rate() const { + return interim_index_mem_expansion_rate_; + } + private: inline static const std::unordered_set valid_dense_vector_index_type = { @@ -176,6 +186,7 @@ class SegcoreConfig { knowhere::RefineType::DATA_VIEW; inline static bool refine_with_quant_flag_ = false; inline static bool enable_geometry_cache_ = false; + inline static float interim_index_mem_expansion_rate_ = 1.15f; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 8532f291e2..980ef1b67c 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -47,6 +47,7 @@ #include "storage/ThreadPools.h" #include "storage/KeyRetriever.h" #include "common/TypeTraits.h" +#include "knowhere/comp/index_param.h" #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" @@ -247,6 +248,212 @@ SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) { } } +ResourceUsage +SegmentGrowingImpl::EstimateSegmentResourceUsage() const { + int64_t num_rows = get_row_count(); + if (num_rows == 0) { + return ResourceUsage{0, 0}; + } + + bool growing_mmap_enabled = storage::MmapManager::GetInstance() + .GetMmapConfig() + .GetEnableGrowingMmap(); + + int64_t memory_bytes = 0; + int64_t disk_bytes = 0; + + // 1. Timestamps: always in memory for now + memory_bytes += num_rows * sizeof(Timestamp); + + // 2. pk2offset_ map: always in memory + // Use actual allocated memory from the tracking allocator + memory_bytes += insert_record_.pk2offset_->memory_size(); + + // 3. Field data and interim index + // For vector fields with interim index: + // - IVF_FLAT_CC: index stores raw data, so count index_size = raw_size * memExpansionRate (memory) + // - SCANN_DVR: index doesn't store raw data, so count raw_size (memory or mmap) + // For other fields: count raw_size based on mmap setting + bool interim_index_enabled = + segcore_config_.get_enable_interim_segment_index(); + bool is_ivf_flat_cc = + segcore_config_.get_dense_vector_intermin_index_type() == + knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC; + + for (const auto& [field_id, field_meta] : schema_->get_fields()) { + if (field_id.get() < START_USER_FIELDID) { + continue; + } + + int64_t field_bytes = 0; + auto data_type = field_meta.get_data_type(); + + if (field_meta.is_vector()) { + // Calculate raw vector size + // Note: get_dim() cannot be called on sparse vectors, so handle that case separately + if (data_type == DataType::VECTOR_SPARSE_U32_F32) { + field_bytes = + num_rows * + SegmentInternalInterface::get_field_avg_size(field_id); + } else { + int64_t dim = field_meta.get_dim(); + switch (data_type) { + case DataType::VECTOR_FLOAT: + field_bytes = num_rows * dim * sizeof(float); + break; + case DataType::VECTOR_FLOAT16: + field_bytes = num_rows * dim * sizeof(float16); + break; + case DataType::VECTOR_BFLOAT16: + field_bytes = num_rows * dim * sizeof(bfloat16); + break; + case DataType::VECTOR_BINARY: + field_bytes = num_rows * (dim / 8); + break; + case DataType::VECTOR_INT8: + field_bytes = num_rows * dim * sizeof(int8_t); + break; + default: + break; + } + } + + // Check if this field has interim index + bool has_interim_index = + interim_index_enabled && indexing_record_.is_in(field_id); + + if (has_interim_index) { + if (data_type == DataType::VECTOR_SPARSE_U32_F32) { + // sparse vector interim index does not support file mmap + // index memory + raw data memory ~ 2x raw data memory + memory_bytes += field_bytes * 2; + } else { + // Dense vector interim index estimation based on index type + if (is_ivf_flat_cc) { + // IVF_FLAT_CC: index stores raw data + // Index memory = raw_size * memExpansionRate + memory_bytes += static_cast( + field_bytes * + segcore_config_ + .get_interim_index_mem_expansion_rate()); + } else { + // SCANN_DVR or no interim index + if (growing_mmap_enabled) { + disk_bytes += field_bytes; + } else { + memory_bytes += field_bytes; + } + } + } + } else { + if (growing_mmap_enabled) { + disk_bytes += field_bytes; + } else { + memory_bytes += field_bytes; + } + } + } else { + // Scalar fields + switch (data_type) { + case DataType::BOOL: + field_bytes = num_rows * sizeof(bool); + break; + case DataType::INT8: + field_bytes = num_rows * sizeof(int8_t); + break; + case DataType::INT16: + field_bytes = num_rows * sizeof(int16_t); + break; + case DataType::INT32: + field_bytes = num_rows * sizeof(int32_t); + break; + case DataType::INT64: + case DataType::TIMESTAMPTZ: + field_bytes = num_rows * sizeof(int64_t); + break; + case DataType::FLOAT: + field_bytes = num_rows * sizeof(float); + break; + case DataType::DOUBLE: + field_bytes = num_rows * sizeof(double); + break; + case DataType::VARCHAR: + case DataType::TEXT: + case DataType::GEOMETRY: { + auto avg_size = + SegmentInternalInterface::get_field_avg_size(field_id); + field_bytes = num_rows * avg_size; + break; + } + case DataType::JSON: { + auto avg_size = + SegmentInternalInterface::get_field_avg_size(field_id); + field_bytes = num_rows * avg_size; + break; + } + case DataType::ARRAY: { + auto avg_size = + SegmentInternalInterface::get_field_avg_size(field_id); + field_bytes = num_rows * avg_size; + break; + } + default: + break; + } + + // Scalar fields: memory or disk based on mmap setting + if (growing_mmap_enabled) { + disk_bytes += field_bytes; + } else { + memory_bytes += field_bytes; + } + } + } + + // 4. Text index (Tantivy) + { + std::shared_lock lock(mutex_); + for (const auto& [field_id, index_variant] : text_indexes_) { + if (auto* ptr = std::get_if>( + &index_variant)) { + memory_bytes += (*ptr)->ByteSize(); + } + } + } + + // 5. Deleted records overhead + memory_bytes += deleted_record_.mem_size(); + + // Apply safety margin + constexpr double kResourceSafetyMargin = 1.2; + memory_bytes = static_cast(memory_bytes * kResourceSafetyMargin); + disk_bytes = static_cast(disk_bytes * kResourceSafetyMargin); + + return ResourceUsage{memory_bytes, disk_bytes}; +} + +void +SegmentGrowingImpl::UpdateResourceTracking() { + auto new_resource = EstimateSegmentResourceUsage(); + + // Lock to ensure refund-then-charge is atomic + std::lock_guard lock(resource_tracking_mutex_); + + auto old_resource = tracked_resource_; + + if (old_resource.AnyGTZero()) { + Manager::GetInstance().RefundLoadedResource( + old_resource, fmt::format("growing_segment_{}_refund", id_)); + } + + if (new_resource.AnyGTZero()) { + Manager::GetInstance().ChargeLoadedResource( + new_resource, fmt::format("growing_segment_{}_charge", id_)); + } + + tracked_resource_ = new_resource; +} + void SegmentGrowingImpl::Insert(int64_t reserved_offset, int64_t num_rows, @@ -263,13 +470,12 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, // step 1: check insert data if valid std::unordered_map field_id_to_offset; int64_t field_offset = 0; - int64_t exist_rows = stats_.mem_size / (sizeof(Timestamp) + sizeof(idx_t)); for (const auto& field : insert_record_proto->fields_data()) { auto field_id = FieldId(field.field_id()); AssertInfo(!field_id_to_offset.count(field_id), "duplicate field data"); field_id_to_offset.emplace(field_id, field_offset++); - AssertInfo(exist_rows == 0 || insert_record_.is_data_exist(field_id), + AssertInfo(insert_record_.is_data_exist(field_id), "unexpected new field in growing segment {}, field id {}", id_, field.field_id()); @@ -299,12 +505,11 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, // step 2: sort timestamp // query node already guarantees that the timestamp is ordered, avoid field data copy in c++ - // step 3: fill into Segment.ConcurrentVector + // step 3: fill into Segment.ConcurrentVector, no mmap_descriptor is used for timestamps insert_record_.timestamps_.set_data_raw( reserved_offset, timestamps_raw, num_rows); + stats_.mem_size += num_rows * sizeof(Timestamp); - // update the mem size of timestamps and row IDs - stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t)); for (auto& [field_id, field_meta] : schema_->get_fields()) { if (field_id.get() < START_USER_FIELDID) { continue; @@ -325,6 +530,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, &insert_record_proto->fields_data(data_offset), field_meta); } + //insert vector data into index if (segcore_config_.get_enable_interim_segment_index()) { indexing_record_.AppendingIndex( @@ -413,7 +619,10 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, insert_record_.insert_pk(pks[i], reserved_offset + i); } - // step 5: update small indexes + // step 5: update the resource usage + UpdateResourceTracking(); + + // step 6: update small indexes insert_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + num_rows); } @@ -755,7 +964,6 @@ SegmentGrowingImpl::Delete(int64_t size, } // step 1: sort timestamp - std::sort(ordering.begin(), ordering.end()); std::vector sort_pks(size); std::vector sort_timestamps(size); @@ -768,6 +976,10 @@ SegmentGrowingImpl::Delete(int64_t size, // step 2: fill delete record deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); + + // step 3: update resource tracking + UpdateResourceTracking(); + return SegcoreError::success(); } @@ -1678,6 +1890,8 @@ SegmentGrowingImpl::Reopen(SchemaPtr sch) { schema_ = sch; } + + UpdateResourceTracking(); } void @@ -1754,6 +1968,9 @@ SegmentGrowingImpl::FinishLoad() { fill_empty_field(field_meta); } } + + // Update resource tracking + UpdateResourceTracking(); } void @@ -1921,7 +2138,7 @@ SegmentGrowingImpl::fill_empty_field(const FieldMeta& field_meta) { field_id, field_meta, size_per_chunk(), mmap_descriptor_); } - auto total_row_num = insert_record_.size(); + auto total_row_num = insert_record_.row_count(); auto data = bulk_subscript_not_exist_field(field_meta, total_row_num); insert_record_.get_valid_data(field_id)->set_data_raw( diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 179dff7d7d..2f57f54e7f 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include "cachinglayer/CacheSlot.h" +#include "cachinglayer/Manager.h" #include "AckResponder.h" #include "ConcurrentVector.h" #include "DeletedRecord.h" @@ -354,6 +356,7 @@ class SegmentGrowingImpl : public SegmentGrowing { segment_id) { this->CreateTextIndexes(); this->InitializeArrayOffsets(); + this->UpdateResourceTracking(); } ~SegmentGrowingImpl() { @@ -373,6 +376,14 @@ class SegmentGrowingImpl : public SegmentGrowing { storage::MmapManager::GetInstance().GetMmapChunkManager(); mcm->UnRegister(mmap_descriptor_); } + + // Refund any tracked resources before destruction + // No lock needed - destructor implies exclusive access + if (tracked_resource_.AnyGTZero()) { + Manager::GetInstance().RefundLoadedResource( + tracked_resource_, + fmt::format("growing_segment_{}_destructor", id_)); + } } void @@ -515,6 +526,22 @@ class SegmentGrowingImpl : public SegmentGrowing { const int64_t* seg_offsets, int64_t count) const; + /** + * @brief Estimate the current total resource usage of the growing segment + * + * This includes memory/disk usage for: + * - Field data (raw vectors and scalars) + * - Timestamps + * - PK-to-offset index + * - Interim vector indexes (if enabled) + * - Text match indexes (if enabled) + * - Deleted records + * + * @return ResourceUsage containing memory_bytes and file_bytes estimates + */ + ResourceUsage + EstimateSegmentResourceUsage() const; + protected: int64_t num_chunk(FieldId field_id) const override; @@ -572,6 +599,20 @@ class SegmentGrowingImpl : public SegmentGrowing { void fill_empty_field(const FieldMeta& field_meta); + /** + * @brief Update resource tracking by refunding old estimate and charging new + * + * This method: + * 1. Estimates current total resource usage of the growing segment + * 2. Refunds the previously tracked resource from the cache manager + * 3. Charges the new resource usage to the cache manager + * 4. Updates the tracked resource checkpoint + * + * Should be called after data modifications (Insert, Delete, etc.) + */ + void + UpdateResourceTracking(); + private: void AddTexts(FieldId field_id, @@ -646,6 +687,12 @@ class SegmentGrowingImpl : public SegmentGrowing { // Representative field_id for each struct (used to extract array lengths during Insert) // One field_id per struct, since all fields in the same struct have identical array lengths std::unordered_set struct_representative_fields_; + + // Tracked resource usage for refund-then-charge pattern + // This stores the last estimated resource usage that was charged to the cache manager + ResourceUsage tracked_resource_{}; + // Mutex to protect tracked_resource_ updates (refund-then-charge must be atomic) + mutable std::mutex resource_tracking_mutex_; }; inline SegmentGrowingPtr diff --git a/internal/core/src/segcore/SegmentGrowingTest.cpp b/internal/core/src/segcore/SegmentGrowingTest.cpp index 95ea12c71c..e1d31fa426 100644 --- a/internal/core/src/segcore/SegmentGrowingTest.cpp +++ b/internal/core/src/segcore/SegmentGrowingTest.cpp @@ -12,6 +12,9 @@ #include #include +#include +#include + #include "common/Types.h" #include "common/IndexMeta.h" #include "knowhere/comp/index_param.h" @@ -1222,3 +1225,224 @@ TEST(Growing, TestMaskWithNullableTTLField) { EXPECT_EQ(expired_count, test_data_count / 4); } + +// Resource tracking tests for growing segments +TEST(Growing, EmptySegmentResourceEstimation) { + auto schema = std::make_shared(); + auto dim = 128; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + // Empty segment should have zero resource usage + auto resource = segment_impl->EstimateSegmentResourceUsage(); + EXPECT_EQ(resource.memory_bytes, 0); + EXPECT_EQ(resource.file_bytes, 0); +} + +TEST(Growing, ResourceEstimationAfterInsert) { + auto schema = std::make_shared(); + auto dim = 128; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + // Insert some data + const int64_t N = 1000; + auto dataset = DataGen(schema, N); + segment->PreInsert(N); + segment->Insert(0, + N, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + // After insert, resource usage should be positive + auto resource = segment_impl->EstimateSegmentResourceUsage(); + EXPECT_GT(resource.memory_bytes, 0); + + // Memory should include at least: + // - Vector data: N * dim * sizeof(float) = 1000 * 128 * 4 = 512000 bytes + // - Timestamps: N * sizeof(Timestamp) = 1000 * 8 = 8000 bytes + // - PK field: N * sizeof(int64_t) = 1000 * 8 = 8000 bytes + // Plus safety margin of 1.2x + int64_t expected_min_size = + N * dim * sizeof(float) + N * sizeof(Timestamp) + N * sizeof(int64_t); + EXPECT_GE(resource.memory_bytes, expected_min_size); +} + +TEST(Growing, ResourceIncrementsWithMoreInserts) { + auto schema = std::make_shared(); + auto dim = 128; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + // First insert + const int64_t N1 = 500; + auto dataset1 = DataGen(schema, N1, 42, 0); + segment->PreInsert(N1); + segment->Insert(0, + N1, + dataset1.row_ids_.data(), + dataset1.timestamps_.data(), + dataset1.raw_); + auto resource1 = segment_impl->EstimateSegmentResourceUsage(); + + // Second insert + const int64_t N2 = 500; + auto dataset2 = DataGen(schema, N2, 43, N1); + segment->PreInsert(N2); + segment->Insert(N1, + N2, + dataset2.row_ids_.data(), + dataset2.timestamps_.data(), + dataset2.raw_); + auto resource2 = segment_impl->EstimateSegmentResourceUsage(); + + // Resource should increase after second insert + EXPECT_GT(resource2.memory_bytes, resource1.memory_bytes); +} + +TEST(Growing, ResourceTrackingAfterDelete) { + auto schema = std::make_shared(); + auto dim = 64; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + // Insert data first + const int64_t N = 100; + auto dataset = DataGen(schema, N); + segment->PreInsert(N); + segment->Insert(0, + N, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + auto resource_before_delete = segment_impl->EstimateSegmentResourceUsage(); + EXPECT_GT(resource_before_delete.memory_bytes, 0); + + // Delete some rows + auto pks = dataset.get_col(pk_fid); + auto del_pks = GenPKs(pks.begin(), pks.begin() + 5); + auto del_tss = GenTss(5, N); + auto status = segment->Delete(5, del_pks.get(), del_tss.data()); + EXPECT_TRUE(status.ok()); + + // Resource estimation should still work after delete + auto resource_after_delete = segment_impl->EstimateSegmentResourceUsage(); + EXPECT_GT(resource_after_delete.memory_bytes, 0); +} + +TEST(Growing, ConcurrentInsertResourceTracking) { + auto schema = std::make_shared(); + auto dim = 32; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + const int num_threads = 4; + const int64_t rows_per_thread = 100; + std::vector threads; + + // Reserve space for all rows upfront + int64_t total_rows = num_threads * rows_per_thread; + segment->PreInsert(total_rows); + + // Concurrent inserts from multiple threads + for (int t = 0; t < num_threads; ++t) { + threads.emplace_back([&, t]() { + auto dataset = + DataGen(schema, rows_per_thread, 42 + t, t * rows_per_thread); + segment->Insert(t * rows_per_thread, + rows_per_thread, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + // Verify total row count + EXPECT_EQ(segment->get_row_count(), total_rows); + + // Verify resource estimation is consistent and positive + auto resource = segment_impl->EstimateSegmentResourceUsage(); + EXPECT_GT(resource.memory_bytes, 0); +} + +TEST(Growing, MultipleFieldsResourceEstimation) { + // Create schema with multiple fields + auto schema = std::make_shared(); + auto dim = 64; + auto metric_type = knowhere::metric::L2; + auto vec_fid = + schema->AddDebugField("vec", DataType::VECTOR_FLOAT, dim, metric_type); + auto pk_fid = schema->AddDebugField("pk", DataType::INT64); + auto float_fid = schema->AddDebugField("age", DataType::FLOAT); + auto double_fid = schema->AddDebugField("score", DataType::DOUBLE); + schema->set_primary_field_id(pk_fid); + + auto segment = CreateGrowingSegment(schema, empty_index_meta); + auto* segment_impl = dynamic_cast(segment.get()); + ASSERT_NE(segment_impl, nullptr); + + // Insert data + const int64_t N = 500; + auto dataset = DataGen(schema, N); + segment->PreInsert(N); + segment->Insert(0, + N, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + auto resource = segment_impl->EstimateSegmentResourceUsage(); + + // Memory should include all fields: + // - Vector: N * dim * sizeof(float) = 500 * 64 * 4 = 128000 bytes + // - pk (int64): N * 8 = 4000 bytes + // - age (float): N * 4 = 2000 bytes + // - score (double): N * 8 = 4000 bytes + // - Timestamps: N * 8 = 4000 bytes + // Plus safety margin + int64_t min_expected = N * dim * sizeof(float) + N * sizeof(int64_t) + + N * sizeof(float) + N * sizeof(double) + + N * sizeof(Timestamp); + EXPECT_GE(resource.memory_bytes, min_expected); +} diff --git a/internal/core/src/segcore/TimestampIndex.h b/internal/core/src/segcore/TimestampIndex.h index 114cc1fb7f..9673bf61fb 100644 --- a/internal/core/src/segcore/TimestampIndex.h +++ b/internal/core/src/segcore/TimestampIndex.h @@ -46,7 +46,7 @@ class TimestampIndex { std::pair active_range); size_t - size() const { + memory_size() const { return sizeof(*this) + lengths_.size() * sizeof(int64_t) + start_locs_.size() * sizeof(int64_t) + timestamp_barriers_.size() * sizeof(Timestamp); diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 116087711c..3772cec9f5 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -201,6 +201,20 @@ ReleaseLoadingResource(CResourceUsage size) { size.disk_bytes)); } +void +ChargeLoadedResource(CResourceUsage size) { + milvus::cachinglayer::Manager::GetInstance().ChargeLoadedResource( + milvus::cachinglayer::ResourceUsage(size.memory_bytes, + size.disk_bytes)); +} + +void +RefundLoadedResource(CResourceUsage size) { + milvus::cachinglayer::Manager::GetInstance().RefundLoadedResource( + milvus::cachinglayer::ResourceUsage(size.memory_bytes, + size.disk_bytes)); +} + CStatus AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { SCOPE_CGO_CALL_METRIC(); diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index 708e992b6e..9df51fe27a 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -48,6 +48,12 @@ TryReserveLoadingResourceWithTimeout(CResourceUsage size, void ReleaseLoadingResource(CResourceUsage size); +void +ChargeLoadedResource(CResourceUsage size); + +void +RefundLoadedResource(CResourceUsage size); + CStatus AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index 17e22b515d..5b0860ff11 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -99,6 +99,13 @@ SegcoreSetDenseVectorInterminIndexRefineWithQuantFlag(const bool value) { config.set_refine_with_quant_flag(value); } +extern "C" void +SegcoreSetInterimIndexMemExpansionRate(const float value) { + milvus::segcore::SegcoreConfig& config = + milvus::segcore::SegcoreConfig::default_config(); + config.set_interim_index_mem_expansion_rate(value); +} + extern "C" void SegcoreSetSubDim(const int64_t value) { milvus::segcore::SegcoreConfig& config = diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index 5eef9b3769..2233ac46ee 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -60,6 +60,9 @@ SegcoreSetDenseVectorInterminIndexRefineQuantType(const char*); void SegcoreSetDenseVectorInterminIndexRefineWithQuantFlag(const bool); +void +SegcoreSetInterimIndexMemExpansionRate(const float); + // return value must be freed by the caller char* SegcoreSetSimdType(const char*); diff --git a/internal/core/thirdparty/jemalloc/CMakeLists.txt b/internal/core/thirdparty/jemalloc/CMakeLists.txt index f558075a1e..9dc2ebed9e 100644 --- a/internal/core/thirdparty/jemalloc/CMakeLists.txt +++ b/internal/core/thirdparty/jemalloc/CMakeLists.txt @@ -37,7 +37,7 @@ detect_aarch64_target_arch() set(JEMALLOC_PREFIX "${CMAKE_INSTALL_PREFIX}") set(JEMALLOC_LIB_DIR "${JEMALLOC_PREFIX}/lib") set(JEMALLOC_STATIC_LIB "${JEMALLOC_LIB_DIR}/libjemalloc_pic${CMAKE_STATIC_LIBRARY_SUFFIX}") -set(JEMALLOC_CONFIGURE_COMMAND ./configure "AR=${CMAKE_AR}" "CC=${CMAKE_C_COMPILER}") +set(JEMALLOC_CONFIGURE_COMMAND ./configure "AR=${CMAKE_AR}" "CC=${CMAKE_C_COMPILER}" "CFLAGS=-fno-omit-frame-pointer" "CXXFLAGS=-fno-omit-frame-pointer") message("CMAKE_OSX_SYSROOT: ${CMAKE_OSX_SYSROOT}") if (CMAKE_OSX_SYSROOT) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 74bf2fea53..1848a079f0 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -1146,6 +1146,9 @@ func (sd *shardDelegator) Close() { sd.tsCond.Broadcast() sd.lifetime.Wait() + // Remove all candidates and refund bloom filter resources + sd.pkOracle.RemoveAndRefundAll() + // clean idf oracle if sd.idfOracle != nil { sd.idfOracle.Close() diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 8269e96e38..4ebb6e98f4 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -863,17 +863,20 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele sd.AddExcludedSegments(droppedInfos) if len(sealed) > 0 { - sd.pkOracle.Remove( + removed := sd.pkOracle.Remove( pkoracle.WithSegmentIDs(lo.Map(sealed, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...), pkoracle.WithSegmentType(commonpb.SegmentState_Sealed), pkoracle.WithWorkerID(targetNodeID), ) + // Refund resources for removed sealed segment candidates + sd.pkOracle.RefundRemoved(removed) } if len(growing) > 0 { sd.pkOracle.Remove( pkoracle.WithSegmentIDs(lo.Map(growing, func(entry SegmentEntry, _ int) int64 { return entry.SegmentID })...), pkoracle.WithSegmentType(commonpb.SegmentState_Growing), ) + // leave the bloom filter sets in growing segment be closed by Release() } var releaseErr error diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 5d9068af4a..21bb133282 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -519,6 +519,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() { s.True(s.delegator.distribution.Serviceable()) s.delegator.Close() + // After Close(), pkOracle is cleared, so ProcessDelete becomes a no-op. + // This is expected behavior - the delegator is being decommissioned. s.delegator.ProcessDelete([]*DeleteData{ { PartitionID: 500, @@ -528,7 +530,6 @@ func (s *DelegatorDataSuite) TestProcessDelete() { }, }, 10) s.Require().NoError(err) - s.False(s.delegator.distribution.Serviceable()) } func (s *DelegatorDataSuite) TestLoadGrowingWithBM25() { diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index 0801e1549c..56b968a96b 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -16,6 +16,13 @@ package pkoracle +/* +#cgo pkg-config: milvus_core + +#include "segcore/load_index_c.h" +*/ +import "C" + import ( "sync" @@ -40,6 +47,10 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics + + // Resource tracking + trackedSize int64 // memory size that was charged + resourceCharged bool // tracks whether memory resources were charged for this bloom filter set } // MayPkExist returns whether any bloom filters returns positive. @@ -140,6 +151,103 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.historyStats = append(s.historyStats, stats) } +// MemSize returns the total memory size of all bloom filters in bytes. +// This includes both currentStat and all historyStats. +func (s *BloomFilterSet) MemSize() int64 { + s.statsMutex.RLock() + defer s.statsMutex.RUnlock() + + var size int64 + if s.currentStat != nil && s.currentStat.PkFilter != nil { + size += int64(s.currentStat.PkFilter.Cap() / 8) // Cap returns bits, convert to bytes + } + for _, stats := range s.historyStats { + if stats != nil && stats.PkFilter != nil { + size += int64(stats.PkFilter.Cap() / 8) + } + } + return size +} + +// Charge charges memory resource for this bloom filter set via caching layer. +// Safe to call multiple times - only charges once. +func (s *BloomFilterSet) Charge() { + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + + if s.resourceCharged { + return // Already charged + } + + size := s.memSizeLocked() + if size > 0 { + C.ChargeLoadedResource(C.CResourceUsage{ + memory_bytes: C.int64_t(size), + disk_bytes: 0, + }) + s.trackedSize = size + s.resourceCharged = true + log.Debug("charged bloom filter resource", + zap.Int64("segmentID", s.segmentID), + zap.Int64("size", size)) + } +} + +// Refund refunds any charged resources. Safe to call multiple times. +func (s *BloomFilterSet) Refund() { + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + + if !s.resourceCharged || s.trackedSize <= 0 { + return + } + + C.RefundLoadedResource(C.CResourceUsage{ + memory_bytes: C.int64_t(s.trackedSize), + disk_bytes: 0, + }) + log.Debug("refunded bloom filter resource", + zap.Int64("segmentID", s.segmentID), + zap.Int64("size", s.trackedSize)) + s.trackedSize = 0 + s.resourceCharged = false +} + +// IsResourceCharged returns whether memory resources have been charged for this bloom filter set. +func (s *BloomFilterSet) IsResourceCharged() bool { + s.statsMutex.RLock() + defer s.statsMutex.RUnlock() + return s.resourceCharged +} + +// SetResourceCharged sets the resourceCharged flag and trackedSize for testing purposes. +// This allows tests to simulate charged state without calling the actual C code. +func (s *BloomFilterSet) SetResourceCharged(charged bool) { + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.resourceCharged = charged + if charged { + s.trackedSize = s.memSizeLocked() + } else { + s.trackedSize = 0 + } +} + +// memSizeLocked returns the total memory size without acquiring the lock. +// Caller must hold statsMutex. +func (s *BloomFilterSet) memSizeLocked() int64 { + var size int64 + if s.currentStat != nil && s.currentStat.PkFilter != nil { + size += int64(s.currentStat.PkFilter.Cap() / 8) + } + for _, stats := range s.historyStats { + if stats != nil && stats.PkFilter != nil { + size += int64(stats.PkFilter.Cap() / 8) + } + } + return size +} + // NewBloomFilterSet returns a new BloomFilterSet. func NewBloomFilterSet(segmentID int64, paritionID int64, segType commonpb.SegmentState) *BloomFilterSet { bfs := &BloomFilterSet{ diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go index b5b37d9e24..ad2ccc8472 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set_test.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -100,3 +100,91 @@ func TestHistoricalStat(t *testing.T) { assert.True(t, ret[i]) } } + +func TestMemSize(t *testing.T) { + paramtable.Init() + + t.Run("empty bloom filter set", func(t *testing.T) { + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + size := bfs.MemSize() + assert.Equal(t, int64(0), size) + }) + + t.Run("with current stat only", func(t *testing.T) { + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + size := bfs.MemSize() + assert.Greater(t, size, int64(0)) + // Bloom filter size should be related to the configured BloomFilterSize + // Cap() returns bits, MemSize() converts to bytes + expectedSize := int64(bfs.currentStat.PkFilter.Cap() / 8) + assert.Equal(t, expectedSize, size) + }) + + t.Run("with historical stats", func(t *testing.T) { + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + currentSize := bfs.MemSize() + + // Add historical stats (simulate compaction history) + bfs.AddHistoricalStats(bfs.currentStat) + bfs.AddHistoricalStats(bfs.currentStat) + + totalSize := bfs.MemSize() + // Should be approximately 3x the single bloom filter size + // (1 current + 2 historical, all pointing to same underlying filter for this test) + assert.Equal(t, currentSize*3, totalSize) + }) + + t.Run("with nil current stat and historical stats", func(t *testing.T) { + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + singleSize := bfs.MemSize() + + // Add historical stats and clear current + bfs.AddHistoricalStats(bfs.currentStat) + bfs.currentStat = nil + + historicalSize := bfs.MemSize() + assert.Equal(t, singleSize, historicalSize) + }) + + t.Run("varchar primary key", func(t *testing.T) { + batchSize := 100 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewVarCharPrimaryKey(strconv.FormatInt(int64(i), 10)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + + size := bfs.MemSize() + assert.Greater(t, size, int64(0)) + }) +} diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index 69e5040c5b..ec6b1d6e3d 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -31,10 +31,17 @@ type PkOracle interface { BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) map[int64][]bool // RegisterCandidate adds candidate into pkOracle. Register(candidate Candidate, workerID int64) error - // RemoveCandidate removes candidate - Remove(filters ...CandidateFilter) error + // RemoveCandidate removes candidate and returns the removed candidates. + Remove(filters ...CandidateFilter) []Candidate // CheckCandidate checks whether candidate with provided key exists. Exists(candidate Candidate, workerID int64) bool + // Range iterates over all candidates without removing them. + Range(fn func(candidate Candidate) bool) + // RefundRemoved refunds resources for BloomFilterSet candidates. + RefundRemoved(candidates []Candidate) + // RemoveAndRefundAll removes all candidates and refunds resources for BloomFilterSet candidates. + // Used during shutdown to clean up and refund resources. + RemoveAndRefundAll() } var _ PkOracle = (*pkOracle)(nil) @@ -97,19 +104,22 @@ func (pko *pkOracle) Register(candidate Candidate, workerID int64) error { return nil } -// Remove removes candidate from pko. -func (pko *pkOracle) Remove(filters ...CandidateFilter) error { +// Remove removes candidate from pko and returns the removed candidates. +func (pko *pkOracle) Remove(filters ...CandidateFilter) []Candidate { + var removed []Candidate pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { return true } } - pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)) + if _, ok := pko.candidates.GetAndRemove(pko.candidateKey(candidate, candidate.workerID)); ok { + removed = append(removed, candidate.Candidate) + } return true }) - return nil + return removed } func (pko *pkOracle) Exists(candidate Candidate, workerID int64) bool { @@ -117,6 +127,34 @@ func (pko *pkOracle) Exists(candidate Candidate, workerID int64) bool { return ok } +// Range iterates over all candidates without removing them. +func (pko *pkOracle) Range(fn func(candidate Candidate) bool) { + pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { + return fn(candidate.Candidate) + }) +} + +// RefundRemoved refunds resources for BloomFilterSet candidates. +// For candidates that are not BloomFilterSet (e.g., LocalSegment), this is a no-op. +func (pko *pkOracle) RefundRemoved(candidates []Candidate) { + for _, candidate := range candidates { + if bfs, ok := candidate.(*BloomFilterSet); ok { + bfs.Refund() + } + } +} + +// RemoveAndRefundAll removes all candidates and refunds resources for BloomFilterSet candidates. +// Used during shutdown to clean up and refund resources. +func (pko *pkOracle) RemoveAndRefundAll() { + removed := pko.Remove() + for _, candidate := range removed { + if bfs, ok := candidate.(*BloomFilterSet); ok { + bfs.Refund() + } + } +} + // NewPkOracle returns pkOracle as PkOracle interface. func NewPkOracle() PkOracle { return &pkOracle{ diff --git a/internal/querynodev2/pkoracle/pk_oracle_test.go b/internal/querynodev2/pkoracle/pk_oracle_test.go index 429cd7d6c8..0b60bcdb70 100644 --- a/internal/querynodev2/pkoracle/pk_oracle_test.go +++ b/internal/querynodev2/pkoracle/pk_oracle_test.go @@ -63,3 +63,223 @@ func TestGet(t *testing.T) { assert.NotContains(t, segmentIDs, int64(1)) } } + +func TestRemoveReturnsRemovedCandidates(t *testing.T) { + paramtable.Init() + + t.Run("remove single candidate", func(t *testing.T) { + pko := NewPkOracle() + + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + bfs := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + pko.Register(bfs, 1) + + removed := pko.Remove(WithSegmentIDs(1)) + assert.Len(t, removed, 1) + assert.Equal(t, int64(1), removed[0].ID()) + }) + + t.Run("remove multiple candidates", func(t *testing.T) { + pko := NewPkOracle() + + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Register multiple candidates + bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs1.UpdateBloomFilter(pks) + pko.Register(bfs1, 1) + + bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed) + bfs2.UpdateBloomFilter(pks) + pko.Register(bfs2, 1) + + bfs3 := NewBloomFilterSet(3, 1, commonpb.SegmentState_Sealed) + bfs3.UpdateBloomFilter(pks) + pko.Register(bfs3, 1) + + // Remove multiple segments + removed := pko.Remove(WithSegmentIDs(1, 2)) + assert.Len(t, removed, 2) + + removedIDs := make([]int64, len(removed)) + for i, c := range removed { + removedIDs[i] = c.ID() + } + assert.Contains(t, removedIDs, int64(1)) + assert.Contains(t, removedIDs, int64(2)) + + // Verify bfs3 still exists + assert.True(t, pko.Exists(bfs3, 1)) + }) + + t.Run("remove non-existent candidate", func(t *testing.T) { + pko := NewPkOracle() + + removed := pko.Remove(WithSegmentIDs(999)) + assert.Len(t, removed, 0) + }) + + t.Run("remove with segment type filter", func(t *testing.T) { + pko := NewPkOracle() + + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Register sealed segment + bfsSealed := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfsSealed.UpdateBloomFilter(pks) + pko.Register(bfsSealed, 1) + + // Register growing segment + bfsGrowing := NewBloomFilterSet(2, 1, commonpb.SegmentState_Growing) + bfsGrowing.UpdateBloomFilter(pks) + pko.Register(bfsGrowing, 1) + + // Remove only sealed segments + removed := pko.Remove(WithSegmentType(commonpb.SegmentState_Sealed)) + assert.Len(t, removed, 1) + assert.Equal(t, int64(1), removed[0].ID()) + + // Growing segment should still exist + assert.True(t, pko.Exists(bfsGrowing, 1)) + }) + + t.Run("remove with worker ID filter", func(t *testing.T) { + pko := NewPkOracle() + + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Register on different workers + bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs1.UpdateBloomFilter(pks) + pko.Register(bfs1, 1) + + bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed) + bfs2.UpdateBloomFilter(pks) + pko.Register(bfs2, 2) + + // Remove only from worker 1 + removed := pko.Remove(WithWorkerID(1)) + assert.Len(t, removed, 1) + assert.Equal(t, int64(1), removed[0].ID()) + + // bfs2 on worker 2 should still exist + assert.True(t, pko.Exists(bfs2, 2)) + }) + + t.Run("remove all candidates", func(t *testing.T) { + pko := NewPkOracle() + + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Register multiple candidates + for i := 0; i < 5; i++ { + bfs := NewBloomFilterSet(int64(i), 1, commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(pks) + pko.Register(bfs, 1) + } + + // Remove all (no filter) + removed := pko.Remove() + assert.Len(t, removed, 5) + }) +} + +func TestRefundRemoved(t *testing.T) { + paramtable.Init() + + t.Run("refund empty candidates", func(t *testing.T) { + pko := NewPkOracle() + // Should not panic with empty slice + pko.RefundRemoved(nil) + pko.RefundRemoved([]Candidate{}) + }) + + t.Run("refund bloom filter candidates", func(t *testing.T) { + // Test that RefundRemoved correctly identifies BloomFilterSet and calls Refund + // Note: We don't set resourceCharged=true to avoid requiring C library initialization + pko := NewPkOracle() + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Create bloom filter sets (not charged, so Refund() is a no-op) + bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs1.UpdateBloomFilter(pks) + + bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed) + bfs2.UpdateBloomFilter(pks) + + candidates := []Candidate{bfs1, bfs2} + + // Should not panic - Refund() returns early when not charged + pko.RefundRemoved(candidates) + }) +} + +func TestRemoveAndRefundAll(t *testing.T) { + paramtable.Init() + + t.Run("remove and refund all with empty oracle", func(t *testing.T) { + pko := NewPkOracle() + // Should not panic + pko.RemoveAndRefundAll() + }) + + t.Run("remove and refund all bloom filter candidates", func(t *testing.T) { + // Test that RemoveAndRefundAll removes and refunds all BloomFilterSet candidates + // Note: We don't set resourceCharged=true to avoid requiring C library initialization + pko := NewPkOracle() + batchSize := 10 + pks := make([]storage.PrimaryKey, 0) + for i := 0; i < batchSize; i++ { + pk := storage.NewInt64PrimaryKey(int64(i)) + pks = append(pks, pk) + } + + // Create and register bloom filter sets (not charged) + bfs1 := NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed) + bfs1.UpdateBloomFilter(pks) + pko.Register(bfs1, 1) + + bfs2 := NewBloomFilterSet(2, 1, commonpb.SegmentState_Sealed) + bfs2.UpdateBloomFilter(pks) + pko.Register(bfs2, 1) + + // Should not panic - Refund() returns early when not charged + pko.RemoveAndRefundAll() + + // Candidates should be removed from oracle + assert.False(t, pko.Exists(bfs1, 1)) + assert.False(t, pko.Exists(bfs2, 1)) + }) +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index f843a52df5..1c19021e37 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -499,6 +499,20 @@ func (s *LocalSegment) LastDeltaTimestamp() uint64 { return s.lastDeltaTimestamp.Load() } +// UpdateBloomFilter updates bloom filter with provided pks and charges resource if BF is newly created. +// This overrides baseSegment.UpdateBloomFilter to handle resource charging for growing segments. +func (s *LocalSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { + if s.skipGrowingBF { + return + } + + // Update bloom filter (may create new BF if not exist) + s.bloomFilterSet.UpdateBloomFilter(pks) + + // Charge bloom filter resource (safe to call multiple times - only charges once) + s.bloomFilterSet.Charge() +} + func (s *LocalSegment) GetIndexByID(indexID int64) *IndexedFieldInfo { info, _ := s.fieldIndexes.Get(indexID) return info @@ -1451,6 +1465,9 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { // usage := s.ResourceUsageEstimate() // s.manager.SubLogicalResource(usage) + // Refund bloom filter resource + s.bloomFilterSet.Refund() + binlogSize := s.binlogSize.Load() if binlogSize > 0 { // no concurrent change to s.binlogSize, so the subtraction is safe diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 2e4933e42c..100641d8ed 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -380,12 +380,14 @@ func (loader *segmentLoader) Load(ctx context.Context, } if !segment.BloomFilterExist() { - log.Debug("BloomFilterExist", zap.Int64("segid", segment.ID())) + log.Debug("loading bloom filter for segment", zap.Int64("segmentID", segment.ID())) bfs, err := loader.loadSingleBloomFilterSet(ctx, loadInfo.GetCollectionID(), loadInfo, segment.Type()) if err != nil { return errors.Wrap(err, "At LoadBloomFilter") } segment.SetBloomFilter(bfs) + // Charge bloom filter resource + bfs.Charge() } if segment.Level() != datapb.SegmentLevel_L0 { @@ -708,11 +710,44 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI return nil, err } pkField := GetPkField(collection.Schema()) + pkFieldID := pkField.GetFieldID() + + // Calculate total memory size needed for bloom filters (PK stats) + var totalMemorySize int64 + for _, info := range infos { + for _, fieldBinlog := range info.Statslogs { + if fieldBinlog.FieldID == pkFieldID { + totalMemorySize += getBinlogDataMemorySize(fieldBinlog) + } + } + } + + // Reserve memory resource if tiered eviction is enabled + if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && totalMemorySize > 0 { + if ok := C.TryReserveLoadingResourceWithTimeout(C.CResourceUsage{ + // double loading memory size for bloom filters to avoid OOM during loading + memory_bytes: C.int64_t(totalMemorySize * 2), + disk_bytes: C.int64_t(0), + }, 1000); !ok { + return nil, fmt.Errorf("failed to reserve loading resource for bloom filters, totalMemorySize = %v MB", + logutil.ToMB(float64(totalMemorySize))) + } + log.Info("reserved loading resource for bloom filters", zap.Float64("totalMemorySizeMB", logutil.ToMB(float64(totalMemorySize)))) + } + + defer func() { + if paramtable.Get().QueryNodeCfg.TieredEvictionEnabled.GetAsBool() && totalMemorySize > 0 { + C.ReleaseLoadingResource(C.CResourceUsage{ + memory_bytes: C.int64_t(totalMemorySize * 2), + disk_bytes: C.int64_t(0), + }) + log.Info("released loading resource for bloom filters", zap.Float64("totalMemorySizeMB", logutil.ToMB(float64(totalMemorySize)))) + } + }() log.Info("start loading remote...", zap.Int("segmentNum", segmentNum)) loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]() - // TODO check memory for bf size loadRemoteFunc := func(idx int) error { loadInfo := infos[idx] partitionID := loadInfo.PartitionID @@ -742,7 +777,14 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI return nil, err } - return loadedBfs.Collect(), nil + result := loadedBfs.Collect() + + // Charge loaded resource for bloom filters + for _, bfs := range result { + bfs.Charge() + } + + return result, nil } func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*IndexedFieldInfo, []*datapb.FieldBinlog) { diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index e6dca8e427..b8e0c9defb 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -576,6 +576,9 @@ func InitInterminIndexConfig(params *paramtable.ComponentParam) error { enableInterminIndex := C.bool(params.QueryNodeCfg.EnableInterminSegmentIndex.GetAsBool()) C.SegcoreSetEnableInterminSegmentIndex(enableInterminIndex) + memExpansionRate := C.float(params.QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat()) + C.SegcoreSetInterimIndexMemExpansionRate(memExpansionRate) + nlist := C.int64_t(params.QueryNodeCfg.InterimIndexNlist.GetAsInt64()) C.SegcoreSetNlist(nlist)