From f7b262a7021bf3b6ed278ac22e7872024a672c6c Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Sat, 19 Jul 2025 02:02:52 +0800 Subject: [PATCH] feat: make storagev1 to support eviction (#43219) issue: https://github.com/milvus-io/milvus/issues/41435 turns out we have per file binlog size in golang code, by passing it into segcore we can support eviction in storage v1 Signed-off-by: Buqian Zheng --- internal/core/src/common/LoadInfo.h | 2 + .../src/segcore/ChunkedSegmentSealedImpl.cpp | 17 ++- .../core/src/segcore/load_field_data_c.cpp | 3 + internal/core/src/segcore/load_field_data_c.h | 1 + .../storagev1translator/ChunkTranslator.cpp | 27 +++-- .../storagev1translator/ChunkTranslator.h | 21 ++-- internal/core/src/storage/Util.h | 10 ++ internal/core/unittest/test_c_api.cpp | 4 + .../test_chunked_segment_storage_v2.cpp | 36 +++--- .../core/unittest/test_growing_storage_v2.cpp | 4 + .../unittest/test_utils/storage_test_utils.h | 6 + .../segments/load_field_data_info.go | 111 ------------------ internal/util/segcore/requests.go | 3 +- 13 files changed, 95 insertions(+), 150 deletions(-) delete mode 100644 internal/querynodev2/segments/load_field_data_info.go diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index ab92ae183f..6e4dc1f76a 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -28,6 +28,8 @@ struct FieldBinlogInfo { int64_t field_id; int64_t row_count = -1; std::vector entries_nums; + // estimated memory size for each binlog file, in bytes, used by caching layer + std::vector memory_sizes; bool enable_mmap{false}; std::vector insert_files; }; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 535acaca18..b1cd71ee8b 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -388,13 +388,18 @@ ChunkedSegmentSealedImpl::load_field_data_internal( this->get_segment_id(), field_id.get()); } else { - std::vector> - insert_files_with_entries_nums; + std::vector + file_infos; + file_infos.reserve(info.insert_files.size()); for (int i = 0; i < info.insert_files.size(); i++) { - insert_files_with_entries_nums.emplace_back( - info.insert_files[i], info.entries_nums[i]); + file_infos.emplace_back( + storagev1translator::ChunkTranslator::FileInfo{ + info.insert_files[i], + info.entries_nums[i], + info.memory_sizes[i]}); } - storage::SortByPath(insert_files_with_entries_nums); + + storage::SortByPath(file_infos); auto field_meta = schema_->operator[](field_id); std::unique_ptr> translator = @@ -402,7 +407,7 @@ ChunkedSegmentSealedImpl::load_field_data_internal( this->get_segment_id(), field_meta, field_data_info, - std::move(insert_files_with_entries_nums), + std::move(file_infos), info.enable_mmap, load_info.load_priority); diff --git a/internal/core/src/segcore/load_field_data_c.cpp b/internal/core/src/segcore/load_field_data_c.cpp index e0090a9b13..ee3f8f3b4c 100644 --- a/internal/core/src/segcore/load_field_data_c.cpp +++ b/internal/core/src/segcore/load_field_data_c.cpp @@ -70,6 +70,7 @@ CStatus AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info, int64_t field_id, int64_t entries_num, + int64_t memory_size, const char* c_file_path) { SCOPE_CGO_CALL_METRIC(); @@ -86,6 +87,8 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info, file_path); load_field_data_info->field_infos[field_id].entries_nums.emplace_back( entries_num); + load_field_data_info->field_infos[field_id].memory_sizes.emplace_back( + memory_size); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/segcore/load_field_data_c.h b/internal/core/src/segcore/load_field_data_c.h index 497cbe39f3..c4a5916dc0 100644 --- a/internal/core/src/segcore/load_field_data_c.h +++ b/internal/core/src/segcore/load_field_data_c.h @@ -41,6 +41,7 @@ CStatus AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info, int64_t field_id, int64_t entries_num, + int64_t memory_size, const char* file_path); void diff --git a/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp b/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp index 8c01d4efb5..73e86eac23 100644 --- a/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp @@ -71,7 +71,7 @@ ChunkTranslator::ChunkTranslator( int64_t segment_id, FieldMeta field_meta, FieldDataInfo field_data_info, - std::vector>&& files_and_rows, + std::vector&& file_infos, bool use_mmap, milvus::proto::common::LoadPriority load_priority) : segment_id_(segment_id), @@ -79,7 +79,7 @@ ChunkTranslator::ChunkTranslator( field_meta_(field_meta), key_(fmt::format("seg_{}_f_{}", segment_id, field_meta.get_id().get())), use_mmap_(use_mmap), - files_and_rows_(std::move(files_and_rows)), + file_infos_(std::move(file_infos)), mmap_dir_path_(field_data_info.mmap_dir_path), meta_(use_mmap ? milvus::cachinglayer::StorageType::DISK : milvus::cachinglayer::StorageType::MEMORY, @@ -88,14 +88,14 @@ ChunkTranslator::ChunkTranslator( IsVectorDataType(field_meta.get_data_type()), /* is_index */ false, /* in_load_list*/ field_data_info.in_load_list), - /* support_eviction */ false), + /* support_eviction */ true), load_priority_(load_priority) { AssertInfo(!SystemProperty::Instance().IsSystem(FieldId(field_id_)), "ChunkTranslator not supported for system field"); meta_.num_rows_until_chunk_.push_back(0); - for (auto& [file, rows] : files_and_rows_) { + for (const auto& info : file_infos_) { meta_.num_rows_until_chunk_.push_back( - meta_.num_rows_until_chunk_.back() + rows); + meta_.num_rows_until_chunk_.back() + info.row_count); } AssertInfo(meta_.num_rows_until_chunk_.back() == field_data_info.row_count, fmt::format("data lost while loading column {}: found " @@ -104,7 +104,7 @@ ChunkTranslator::ChunkTranslator( meta_.num_rows_until_chunk_.back(), field_data_info.row_count)); virtual_chunk_config(field_data_info.row_count, - files_and_rows_.size(), + file_infos_.size(), meta_.num_rows_until_chunk_, meta_.virt_chunk_order_, meta_.vcid_to_cid_arr_); @@ -112,7 +112,7 @@ ChunkTranslator::ChunkTranslator( size_t ChunkTranslator::num_cells() const { - return files_and_rows_.size(); + return file_infos_.size(); } milvus::cachinglayer::cid_t @@ -125,7 +125,16 @@ ChunkTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const { milvus::cachinglayer::ResourceUsage ChunkTranslator::estimated_byte_size_of_cell( milvus::cachinglayer::cid_t cid) const { - return {0, 0}; + AssertInfo(cid < file_infos_.size(), "cid out of range"); + + int64_t memory_size = file_infos_[cid].memory_size; + if (use_mmap_) { + // For mmap, the memory is counted as disk usage + return {0, memory_size}; + } else { + // For non-mmap, the memory is counted as memory usage + return {memory_size, 0}; + } } const std::string& @@ -145,7 +154,7 @@ ChunkTranslator::get_cells( std::vector remote_files; remote_files.reserve(cids.size()); for (auto cid : cids) { - remote_files.push_back(files_and_rows_[cid].first); + remote_files.push_back(file_infos_[cid].file_path); } auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); diff --git a/internal/core/src/segcore/storagev1translator/ChunkTranslator.h b/internal/core/src/segcore/storagev1translator/ChunkTranslator.h index a924cf180d..f0ecc0ed14 100644 --- a/internal/core/src/segcore/storagev1translator/ChunkTranslator.h +++ b/internal/core/src/segcore/storagev1translator/ChunkTranslator.h @@ -51,13 +51,18 @@ virtual_chunk_config(int64_t total_row_count, // For this translator each Chunk is a CacheCell, cid_t == uid_t. class ChunkTranslator : public milvus::cachinglayer::Translator { public: - ChunkTranslator( - int64_t segment_id, - FieldMeta field_meta, - FieldDataInfo field_data_info, - std::vector>&& files_and_rows, - bool use_mmap, - milvus::proto::common::LoadPriority load_priority); + struct FileInfo { + std::string file_path; + int64_t row_count; + int64_t memory_size; + }; + + ChunkTranslator(int64_t segment_id, + FieldMeta field_meta, + FieldDataInfo field_data_info, + std::vector&& file_infos, + bool use_mmap, + milvus::proto::common::LoadPriority load_priority); size_t num_cells() const override; @@ -77,7 +82,7 @@ class ChunkTranslator : public milvus::cachinglayer::Translator { } private: - std::vector> files_and_rows_; + std::vector file_infos_; int64_t segment_id_; int64_t field_id_; std::string key_; diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index a2fb017ae3..f70b966deb 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -251,6 +251,16 @@ SortByPath(std::vector>& paths) { }); } +template +inline void +SortByPath(std::vector& paths) { + std::sort(paths.begin(), paths.end(), [](const T& a, const T& b) { + return std::stol( + a.file_path.substr(a.file_path.find_last_of("/") + 1)) < + std::stol(b.file_path.substr(b.file_path.find_last_of("/") + 1)); + }); +} + std::vector CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager, const Config& config); diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 58c91d7a8f..2d1810eab1 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -4179,6 +4179,7 @@ TEST(CApiTest, GrowingSegment_Load_Field_Data_Lack_Binlog_Rows) { FieldBinlogInfo{lack_null_binlog_id.get(), static_cast(ROW_COUNT), std::vector{int64_t(0)}, + std::vector{0}, false, std::vector{}}); @@ -4187,6 +4188,7 @@ TEST(CApiTest, GrowingSegment_Load_Field_Data_Lack_Binlog_Rows) { FieldBinlogInfo{lack_default_value_binlog_id.get(), static_cast(ROW_COUNT), std::vector{int64_t(0)}, + std::vector{0}, false, std::vector{}}); @@ -4249,6 +4251,7 @@ TEST(CApiTest, DISABLED_SealedSegment_Load_Field_Data_Lack_Binlog_Rows) { FieldBinlogInfo{lack_null_binlog_id.get(), static_cast(ROW_COUNT), std::vector{int64_t(0)}, + std::vector{0}, false, std::vector{}}); @@ -4257,6 +4260,7 @@ TEST(CApiTest, DISABLED_SealedSegment_Load_Field_Data_Lack_Binlog_Rows) { FieldBinlogInfo{lack_default_value_binlog_id.get(), static_cast(ROW_COUNT), std::vector{int64_t(0)}, + std::vector{0}, false, std::vector{}}); diff --git a/internal/core/unittest/test_chunked_segment_storage_v2.cpp b/internal/core/unittest/test_chunked_segment_storage_v2.cpp index bb026badc9..26f8da99ab 100644 --- a/internal/core/unittest/test_chunked_segment_storage_v2.cpp +++ b/internal/core/unittest/test_chunked_segment_storage_v2.cpp @@ -166,25 +166,31 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam { LoadFieldDataInfo load_info; load_info.field_infos.emplace( int64_t(0), - FieldBinlogInfo{int64_t(0), - static_cast(row_count), - std::vector(chunk_num * test_data_count), - false, - std::vector({paths[0]})}); + FieldBinlogInfo{ + int64_t(0), + static_cast(row_count), + std::vector(chunk_num * test_data_count), + std::vector(chunk_num * test_data_count * 4), + false, + std::vector({paths[0]})}); load_info.field_infos.emplace( int64_t(102), - FieldBinlogInfo{int64_t(102), - static_cast(row_count), - std::vector(chunk_num * test_data_count), - false, - std::vector({paths[1]})}); + FieldBinlogInfo{ + int64_t(102), + static_cast(row_count), + std::vector(chunk_num * test_data_count), + std::vector(chunk_num * test_data_count * 4), + false, + std::vector({paths[1]})}); load_info.field_infos.emplace( int64_t(103), - FieldBinlogInfo{int64_t(103), - static_cast(row_count), - std::vector(chunk_num * test_data_count), - false, - std::vector({paths[2]})}); + FieldBinlogInfo{ + int64_t(103), + static_cast(row_count), + std::vector(chunk_num * test_data_count), + std::vector(chunk_num * test_data_count * 4), + false, + std::vector({paths[2]})}); load_info.mmap_dir_path = ""; load_info.storage_version = 2; segment->AddFieldDataInfoForSealed(load_info); diff --git a/internal/core/unittest/test_growing_storage_v2.cpp b/internal/core/unittest/test_growing_storage_v2.cpp index 790e83479e..06a84facd1 100644 --- a/internal/core/unittest/test_growing_storage_v2.cpp +++ b/internal/core/unittest/test_growing_storage_v2.cpp @@ -164,12 +164,14 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) { FieldBinlogInfo{0, 3000, std::vector{3000}, + std::vector{3000}, false, std::vector{paths[0]}}}, {1, FieldBinlogInfo{1, 3000, std::vector{3000}, + std::vector{3000}, false, std::vector{paths[1]}}}, }; @@ -360,12 +362,14 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) { FieldBinlogInfo{0, total_rows, std::vector{total_rows}, + std::vector{total_rows * 4}, false, std::vector{paths[0]}}}, {1, FieldBinlogInfo{1, total_rows, std::vector{total_rows}, + std::vector{total_rows * 4}, false, std::vector{paths[1]}}}, }; diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index f9f3dbc20d..ba505953d7 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -98,6 +98,7 @@ PrepareInsertBinlog(int64_t collection_id, FieldBinlogInfo{field_id, static_cast(row_count), std::vector{int64_t(row_count)}, + std::vector{serialized_insert_size}, enable_mmap, std::vector{file}}); }; @@ -152,6 +153,8 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id, files.reserve(field_datas.size()); std::vector row_counts; row_counts.reserve(field_datas.size()); + std::vector serialized_insert_sizes; + serialized_insert_sizes.reserve(field_datas.size()); int64_t row_count = 0; for (auto i = 0; i < field_datas.size(); ++i) { auto& field_data = field_datas[i]; @@ -168,6 +171,8 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id, insert_data->SetFieldDataMeta(field_data_meta); auto serialized_insert_data = insert_data->serialize_to_remote_file(); auto serialized_insert_size = serialized_insert_data.size(); + serialized_insert_sizes.push_back( + static_cast(serialized_insert_size)); cm->Write(file, serialized_insert_data.data(), serialized_insert_size); } @@ -176,6 +181,7 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id, FieldBinlogInfo{field_id, static_cast(row_count), row_counts, + serialized_insert_sizes, enable_mmap, files}); diff --git a/internal/querynodev2/segments/load_field_data_info.go b/internal/querynodev2/segments/load_field_data_info.go deleted file mode 100644 index 4665408bab..0000000000 --- a/internal/querynodev2/segments/load_field_data_info.go +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -/* -#cgo pkg-config: milvus_core -#include "segcore/load_field_data_c.h" -*/ -import "C" - -import ( - "context" - "unsafe" - - "github.com/milvus-io/milvus/pkg/v2/proto/datapb" -) - -type LoadFieldDataInfo struct { - cLoadFieldDataInfo C.CLoadFieldDataInfo -} - -func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) { - var status C.CStatus - var cLoadFieldDataInfo C.CLoadFieldDataInfo - GetDynamicPool().Submit(func() (any, error) { - status = C.NewLoadFieldDataInfo(&cLoadFieldDataInfo, C.int64_t(0)) - return nil, nil - }).Await() - if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil { - return nil, err - } - return &LoadFieldDataInfo{cLoadFieldDataInfo: cLoadFieldDataInfo}, nil -} - -func deleteFieldDataInfo(info *LoadFieldDataInfo) { - GetDynamicPool().Submit(func() (any, error) { - C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo) - return nil, nil - }).Await() -} - -func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error { - var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { - cFieldID := C.int64_t(fieldID) - cRowCount := C.int64_t(rowCount) - - status = C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount) - return nil, nil - }).Await() - - return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed") -} - -func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error { - var status C.CStatus - GetDynamicPool().Submit(func() (any, error) { - cFieldID := C.int64_t(fieldID) - cEntriesNum := C.int64_t(binlog.GetEntriesNum()) - cFile := C.CString(binlog.GetLogPath()) - defer C.free(unsafe.Pointer(cFile)) - - status = C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile) - return nil, nil - }).Await() - - return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed") -} - -func (ld *LoadFieldDataInfo) enableMmap(fieldID int64, enabled bool) { - GetDynamicPool().Submit(func() (any, error) { - cFieldID := C.int64_t(fieldID) - cEnabled := C.bool(enabled) - - C.EnableMmap(ld.cLoadFieldDataInfo, cFieldID, cEnabled) - return nil, nil - }).Await() -} - -func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) { - GetDynamicPool().Submit(func() (any, error) { - cDir := C.CString(dir) - defer C.free(unsafe.Pointer(cDir)) - - C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir) - return nil, nil - }).Await() -} - -func (ld *LoadFieldDataInfo) appendStorageVersion(version int64) { - GetDynamicPool().Submit(func() (any, error) { - cVersion := C.int64_t(version) - C.SetStorageVersion(ld.cLoadFieldDataInfo, cVersion) - - return nil, nil - }).Await() -} diff --git a/internal/util/segcore/requests.go b/internal/util/segcore/requests.go index a5ae8ef69d..bf0d8f157f 100644 --- a/internal/util/segcore/requests.go +++ b/internal/util/segcore/requests.go @@ -68,10 +68,11 @@ func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldD } for _, binlog := range field.Field.Binlogs { cEntriesNum := C.int64_t(binlog.GetEntriesNum()) + cMemorySize := C.int64_t(binlog.GetMemorySize()) cFile := C.CString(binlog.GetLogPath()) defer C.free(unsafe.Pointer(cFile)) - status = C.AppendLoadFieldDataPath(cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile) + status = C.AppendLoadFieldDataPath(cLoadFieldDataInfo, cFieldID, cEntriesNum, cMemorySize, cFile) if err := ConsumeCStatusIntoError(&status); err != nil { return nil, errors.Wrapf(err, "AppendLoadFieldDataPath failed at binlog, %d, %s", field.Field.GetFieldID(), binlog.GetLogPath()) }