feat: make storagev1 to support eviction (#43219)

issue: https://github.com/milvus-io/milvus/issues/41435

turns out we have per file binlog size in golang code, by passing it
into segcore we can support eviction in storage v1

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
Buqian Zheng 2025-07-19 02:02:52 +08:00 committed by GitHub
parent 1f86286ff0
commit f7b262a702
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 95 additions and 150 deletions

View File

@ -28,6 +28,8 @@ struct FieldBinlogInfo {
int64_t field_id;
int64_t row_count = -1;
std::vector<int64_t> entries_nums;
// estimated memory size for each binlog file, in bytes, used by caching layer
std::vector<int64_t> memory_sizes;
bool enable_mmap{false};
std::vector<std::string> insert_files;
};

View File

@ -388,13 +388,18 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
this->get_segment_id(),
field_id.get());
} else {
std::vector<std::pair<std::string, int64_t>>
insert_files_with_entries_nums;
std::vector<storagev1translator::ChunkTranslator::FileInfo>
file_infos;
file_infos.reserve(info.insert_files.size());
for (int i = 0; i < info.insert_files.size(); i++) {
insert_files_with_entries_nums.emplace_back(
info.insert_files[i], info.entries_nums[i]);
file_infos.emplace_back(
storagev1translator::ChunkTranslator::FileInfo{
info.insert_files[i],
info.entries_nums[i],
info.memory_sizes[i]});
}
storage::SortByPath(insert_files_with_entries_nums);
storage::SortByPath(file_infos);
auto field_meta = schema_->operator[](field_id);
std::unique_ptr<Translator<milvus::Chunk>> translator =
@ -402,7 +407,7 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
this->get_segment_id(),
field_meta,
field_data_info,
std::move(insert_files_with_entries_nums),
std::move(file_infos),
info.enable_mmap,
load_info.load_priority);

View File

@ -70,6 +70,7 @@ CStatus
AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
int64_t entries_num,
int64_t memory_size,
const char* c_file_path) {
SCOPE_CGO_CALL_METRIC();
@ -86,6 +87,8 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info,
file_path);
load_field_data_info->field_infos[field_id].entries_nums.emplace_back(
entries_num);
load_field_data_info->field_infos[field_id].memory_sizes.emplace_back(
memory_size);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);

View File

@ -41,6 +41,7 @@ CStatus
AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
int64_t entries_num,
int64_t memory_size,
const char* file_path);
void

View File

@ -71,7 +71,7 @@ ChunkTranslator::ChunkTranslator(
int64_t segment_id,
FieldMeta field_meta,
FieldDataInfo field_data_info,
std::vector<std::pair<std::string, int64_t>>&& files_and_rows,
std::vector<FileInfo>&& file_infos,
bool use_mmap,
milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id),
@ -79,7 +79,7 @@ ChunkTranslator::ChunkTranslator(
field_meta_(field_meta),
key_(fmt::format("seg_{}_f_{}", segment_id, field_meta.get_id().get())),
use_mmap_(use_mmap),
files_and_rows_(std::move(files_and_rows)),
file_infos_(std::move(file_infos)),
mmap_dir_path_(field_data_info.mmap_dir_path),
meta_(use_mmap ? milvus::cachinglayer::StorageType::DISK
: milvus::cachinglayer::StorageType::MEMORY,
@ -88,14 +88,14 @@ ChunkTranslator::ChunkTranslator(
IsVectorDataType(field_meta.get_data_type()),
/* is_index */ false,
/* in_load_list*/ field_data_info.in_load_list),
/* support_eviction */ false),
/* support_eviction */ true),
load_priority_(load_priority) {
AssertInfo(!SystemProperty::Instance().IsSystem(FieldId(field_id_)),
"ChunkTranslator not supported for system field");
meta_.num_rows_until_chunk_.push_back(0);
for (auto& [file, rows] : files_and_rows_) {
for (const auto& info : file_infos_) {
meta_.num_rows_until_chunk_.push_back(
meta_.num_rows_until_chunk_.back() + rows);
meta_.num_rows_until_chunk_.back() + info.row_count);
}
AssertInfo(meta_.num_rows_until_chunk_.back() == field_data_info.row_count,
fmt::format("data lost while loading column {}: found "
@ -104,7 +104,7 @@ ChunkTranslator::ChunkTranslator(
meta_.num_rows_until_chunk_.back(),
field_data_info.row_count));
virtual_chunk_config(field_data_info.row_count,
files_and_rows_.size(),
file_infos_.size(),
meta_.num_rows_until_chunk_,
meta_.virt_chunk_order_,
meta_.vcid_to_cid_arr_);
@ -112,7 +112,7 @@ ChunkTranslator::ChunkTranslator(
size_t
ChunkTranslator::num_cells() const {
return files_and_rows_.size();
return file_infos_.size();
}
milvus::cachinglayer::cid_t
@ -125,7 +125,16 @@ ChunkTranslator::cell_id_of(milvus::cachinglayer::uid_t uid) const {
milvus::cachinglayer::ResourceUsage
ChunkTranslator::estimated_byte_size_of_cell(
milvus::cachinglayer::cid_t cid) const {
return {0, 0};
AssertInfo(cid < file_infos_.size(), "cid out of range");
int64_t memory_size = file_infos_[cid].memory_size;
if (use_mmap_) {
// For mmap, the memory is counted as disk usage
return {0, memory_size};
} else {
// For non-mmap, the memory is counted as memory usage
return {memory_size, 0};
}
}
const std::string&
@ -145,7 +154,7 @@ ChunkTranslator::get_cells(
std::vector<std::string> remote_files;
remote_files.reserve(cids.size());
for (auto cid : cids) {
remote_files.push_back(files_and_rows_[cid].first);
remote_files.push_back(file_infos_[cid].file_path);
}
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);

View File

@ -51,13 +51,18 @@ virtual_chunk_config(int64_t total_row_count,
// For this translator each Chunk is a CacheCell, cid_t == uid_t.
class ChunkTranslator : public milvus::cachinglayer::Translator<milvus::Chunk> {
public:
ChunkTranslator(
int64_t segment_id,
FieldMeta field_meta,
FieldDataInfo field_data_info,
std::vector<std::pair<std::string, int64_t>>&& files_and_rows,
bool use_mmap,
milvus::proto::common::LoadPriority load_priority);
struct FileInfo {
std::string file_path;
int64_t row_count;
int64_t memory_size;
};
ChunkTranslator(int64_t segment_id,
FieldMeta field_meta,
FieldDataInfo field_data_info,
std::vector<FileInfo>&& file_infos,
bool use_mmap,
milvus::proto::common::LoadPriority load_priority);
size_t
num_cells() const override;
@ -77,7 +82,7 @@ class ChunkTranslator : public milvus::cachinglayer::Translator<milvus::Chunk> {
}
private:
std::vector<std::pair<std::string, int64_t>> files_and_rows_;
std::vector<FileInfo> file_infos_;
int64_t segment_id_;
int64_t field_id_;
std::string key_;

View File

@ -251,6 +251,16 @@ SortByPath(std::vector<std::pair<std::string, int64_t>>& paths) {
});
}
template <typename T>
inline void
SortByPath(std::vector<T>& paths) {
std::sort(paths.begin(), paths.end(), [](const T& a, const T& b) {
return std::stol(
a.file_path.substr(a.file_path.find_last_of("/") + 1)) <
std::stol(b.file_path.substr(b.file_path.find_last_of("/") + 1));
});
}
std::vector<FieldDataPtr>
CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager,
const Config& config);

View File

@ -4179,6 +4179,7 @@ TEST(CApiTest, GrowingSegment_Load_Field_Data_Lack_Binlog_Rows) {
FieldBinlogInfo{lack_null_binlog_id.get(),
static_cast<int64_t>(ROW_COUNT),
std::vector<int64_t>{int64_t(0)},
std::vector<int64_t>{0},
false,
std::vector<std::string>{}});
@ -4187,6 +4188,7 @@ TEST(CApiTest, GrowingSegment_Load_Field_Data_Lack_Binlog_Rows) {
FieldBinlogInfo{lack_default_value_binlog_id.get(),
static_cast<int64_t>(ROW_COUNT),
std::vector<int64_t>{int64_t(0)},
std::vector<int64_t>{0},
false,
std::vector<std::string>{}});
@ -4249,6 +4251,7 @@ TEST(CApiTest, DISABLED_SealedSegment_Load_Field_Data_Lack_Binlog_Rows) {
FieldBinlogInfo{lack_null_binlog_id.get(),
static_cast<int64_t>(ROW_COUNT),
std::vector<int64_t>{int64_t(0)},
std::vector<int64_t>{0},
false,
std::vector<std::string>{}});
@ -4257,6 +4260,7 @@ TEST(CApiTest, DISABLED_SealedSegment_Load_Field_Data_Lack_Binlog_Rows) {
FieldBinlogInfo{lack_default_value_binlog_id.get(),
static_cast<int64_t>(ROW_COUNT),
std::vector<int64_t>{int64_t(0)},
std::vector<int64_t>{0},
false,
std::vector<std::string>{}});

View File

@ -166,25 +166,31 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
LoadFieldDataInfo load_info;
load_info.field_infos.emplace(
int64_t(0),
FieldBinlogInfo{int64_t(0),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
false,
std::vector<std::string>({paths[0]})});
FieldBinlogInfo{
int64_t(0),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
std::vector<int64_t>(chunk_num * test_data_count * 4),
false,
std::vector<std::string>({paths[0]})});
load_info.field_infos.emplace(
int64_t(102),
FieldBinlogInfo{int64_t(102),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
false,
std::vector<std::string>({paths[1]})});
FieldBinlogInfo{
int64_t(102),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
std::vector<int64_t>(chunk_num * test_data_count * 4),
false,
std::vector<std::string>({paths[1]})});
load_info.field_infos.emplace(
int64_t(103),
FieldBinlogInfo{int64_t(103),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
false,
std::vector<std::string>({paths[2]})});
FieldBinlogInfo{
int64_t(103),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
std::vector<int64_t>(chunk_num * test_data_count * 4),
false,
std::vector<std::string>({paths[2]})});
load_info.mmap_dir_path = "";
load_info.storage_version = 2;
segment->AddFieldDataInfoForSealed(load_info);

View File

@ -164,12 +164,14 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
FieldBinlogInfo{0,
3000,
std::vector<int64_t>{3000},
std::vector<int64_t>{3000},
false,
std::vector<std::string>{paths[0]}}},
{1,
FieldBinlogInfo{1,
3000,
std::vector<int64_t>{3000},
std::vector<int64_t>{3000},
false,
std::vector<std::string>{paths[1]}}},
};
@ -360,12 +362,14 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
FieldBinlogInfo{0,
total_rows,
std::vector<int64_t>{total_rows},
std::vector<int64_t>{total_rows * 4},
false,
std::vector<std::string>{paths[0]}}},
{1,
FieldBinlogInfo{1,
total_rows,
std::vector<int64_t>{total_rows},
std::vector<int64_t>{total_rows * 4},
false,
std::vector<std::string>{paths[1]}}},
};

View File

@ -98,6 +98,7 @@ PrepareInsertBinlog(int64_t collection_id,
FieldBinlogInfo{field_id,
static_cast<int64_t>(row_count),
std::vector<int64_t>{int64_t(row_count)},
std::vector<int64_t>{serialized_insert_size},
enable_mmap,
std::vector<std::string>{file}});
};
@ -152,6 +153,8 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id,
files.reserve(field_datas.size());
std::vector<int64_t> row_counts;
row_counts.reserve(field_datas.size());
std::vector<int64_t> serialized_insert_sizes;
serialized_insert_sizes.reserve(field_datas.size());
int64_t row_count = 0;
for (auto i = 0; i < field_datas.size(); ++i) {
auto& field_data = field_datas[i];
@ -168,6 +171,8 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id,
insert_data->SetFieldDataMeta(field_data_meta);
auto serialized_insert_data = insert_data->serialize_to_remote_file();
auto serialized_insert_size = serialized_insert_data.size();
serialized_insert_sizes.push_back(
static_cast<int64_t>(serialized_insert_size));
cm->Write(file, serialized_insert_data.data(), serialized_insert_size);
}
@ -176,6 +181,7 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id,
FieldBinlogInfo{field_id,
static_cast<int64_t>(row_count),
row_counts,
serialized_insert_sizes,
enable_mmap,
files});

View File

@ -1,111 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package segments
/*
#cgo pkg-config: milvus_core
#include "segcore/load_field_data_c.h"
*/
import "C"
import (
"context"
"unsafe"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
type LoadFieldDataInfo struct {
cLoadFieldDataInfo C.CLoadFieldDataInfo
}
func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) {
var status C.CStatus
var cLoadFieldDataInfo C.CLoadFieldDataInfo
GetDynamicPool().Submit(func() (any, error) {
status = C.NewLoadFieldDataInfo(&cLoadFieldDataInfo, C.int64_t(0))
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil {
return nil, err
}
return &LoadFieldDataInfo{cLoadFieldDataInfo: cLoadFieldDataInfo}, nil
}
func deleteFieldDataInfo(info *LoadFieldDataInfo) {
GetDynamicPool().Submit(func() (any, error) {
C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo)
return nil, nil
}).Await()
}
func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error {
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
cRowCount := C.int64_t(rowCount)
status = C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed")
}
func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error {
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
cEntriesNum := C.int64_t(binlog.GetEntriesNum())
cFile := C.CString(binlog.GetLogPath())
defer C.free(unsafe.Pointer(cFile))
status = C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed")
}
func (ld *LoadFieldDataInfo) enableMmap(fieldID int64, enabled bool) {
GetDynamicPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
cEnabled := C.bool(enabled)
C.EnableMmap(ld.cLoadFieldDataInfo, cFieldID, cEnabled)
return nil, nil
}).Await()
}
func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) {
GetDynamicPool().Submit(func() (any, error) {
cDir := C.CString(dir)
defer C.free(unsafe.Pointer(cDir))
C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir)
return nil, nil
}).Await()
}
func (ld *LoadFieldDataInfo) appendStorageVersion(version int64) {
GetDynamicPool().Submit(func() (any, error) {
cVersion := C.int64_t(version)
C.SetStorageVersion(ld.cLoadFieldDataInfo, cVersion)
return nil, nil
}).Await()
}

View File

@ -68,10 +68,11 @@ func (req *LoadFieldDataRequest) getCLoadFieldDataRequest() (result *cLoadFieldD
}
for _, binlog := range field.Field.Binlogs {
cEntriesNum := C.int64_t(binlog.GetEntriesNum())
cMemorySize := C.int64_t(binlog.GetMemorySize())
cFile := C.CString(binlog.GetLogPath())
defer C.free(unsafe.Pointer(cFile))
status = C.AppendLoadFieldDataPath(cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile)
status = C.AppendLoadFieldDataPath(cLoadFieldDataInfo, cFieldID, cEntriesNum, cMemorySize, cFile)
if err := ConsumeCStatusIntoError(&status); err != nil {
return nil, errors.Wrapf(err, "AppendLoadFieldDataPath failed at binlog, %d, %s", field.Field.GetFieldID(), binlog.GetLogPath())
}