mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: add vector reserve to improve memory allocation in segcore (#45757)
This commit optimizes std::vector usage across segcore by adding reserve() calls where the size is known in advance, reducing memory reallocations during push_back operations. Changes: - TimestampIndex.cpp: Reserve space for prefix_sums and timestamp_barriers - SegmentGrowingImpl.cpp: Reserve space for binlog info vectors - ChunkedSegmentSealedImpl.cpp: Reserve space for futures and field data vectors - storagev2translator/GroupChunkTranslator.cpp: Reserve space for metadata vectors This improves performance by avoiding multiple memory reallocations when the vector size is predictable. issue: https://github.com/milvus-io/milvus/issues/45679 --------- Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
parent
346449d87f
commit
7078f403f1
@ -391,6 +391,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
|
|||||||
// warmup will be disabled only when all columns are not in load list
|
// warmup will be disabled only when all columns are not in load list
|
||||||
bool merged_in_load_list = false;
|
bool merged_in_load_list = false;
|
||||||
std::vector<FieldId> milvus_field_ids;
|
std::vector<FieldId> milvus_field_ids;
|
||||||
|
milvus_field_ids.reserve(field_id_list.size());
|
||||||
for (int i = 0; i < field_id_list.size(); ++i) {
|
for (int i = 0; i < field_id_list.size(); ++i) {
|
||||||
milvus_field_ids.push_back(FieldId(field_id_list.Get(i)));
|
milvus_field_ids.push_back(FieldId(field_id_list.Get(i)));
|
||||||
merged_in_load_list = merged_in_load_list ||
|
merged_in_load_list = merged_in_load_list ||
|
||||||
@ -2672,6 +2673,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
// Step 2: Load indexes in parallel using thread pool
|
// Step 2: Load indexes in parallel using thread pool
|
||||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
|
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
|
||||||
std::vector<std::future<void>> load_index_futures;
|
std::vector<std::future<void>> load_index_futures;
|
||||||
|
load_index_futures.reserve(field_id_to_index_info.size());
|
||||||
|
|
||||||
for (const auto& pair : field_id_to_index_info) {
|
for (const auto& pair : field_id_to_index_info) {
|
||||||
auto field_id = pair.first;
|
auto field_id = pair.first;
|
||||||
@ -2704,6 +2706,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
|
|
||||||
// Wait for all index loading to complete and collect exceptions
|
// Wait for all index loading to complete and collect exceptions
|
||||||
std::vector<std::exception_ptr> index_exceptions;
|
std::vector<std::exception_ptr> index_exceptions;
|
||||||
|
index_exceptions.reserve(load_index_futures.size());
|
||||||
for (auto& future : load_index_futures) {
|
for (auto& future : load_index_futures) {
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
@ -2754,6 +2757,10 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
|
|
||||||
// Calculate total row count and collect binlog paths
|
// Calculate total row count and collect binlog paths
|
||||||
int64_t total_entries = 0;
|
int64_t total_entries = 0;
|
||||||
|
auto binlog_count = field_binlog.binlogs().size();
|
||||||
|
field_binlog_info.insert_files.reserve(binlog_count);
|
||||||
|
field_binlog_info.entries_nums.reserve(binlog_count);
|
||||||
|
field_binlog_info.memory_sizes.reserve(binlog_count);
|
||||||
for (const auto& binlog : field_binlog.binlogs()) {
|
for (const auto& binlog : field_binlog.binlogs()) {
|
||||||
field_binlog_info.insert_files.push_back(binlog.log_path());
|
field_binlog_info.insert_files.push_back(binlog.log_path());
|
||||||
field_binlog_info.entries_nums.push_back(binlog.entries_num());
|
field_binlog_info.entries_nums.push_back(binlog.entries_num());
|
||||||
@ -2774,6 +2781,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
field_data_to_load.size(),
|
field_data_to_load.size(),
|
||||||
id_);
|
id_);
|
||||||
std::vector<std::future<void>> load_field_futures;
|
std::vector<std::future<void>> load_field_futures;
|
||||||
|
load_field_futures.reserve(field_data_to_load.size());
|
||||||
|
|
||||||
for (const auto& [field_id, load_field_data_info] :
|
for (const auto& [field_id, load_field_data_info] :
|
||||||
field_data_to_load) {
|
field_data_to_load) {
|
||||||
@ -2787,6 +2795,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
|
|
||||||
// Wait for all field data loading to complete and collect exceptions
|
// Wait for all field data loading to complete and collect exceptions
|
||||||
std::vector<std::exception_ptr> field_exceptions;
|
std::vector<std::exception_ptr> field_exceptions;
|
||||||
|
field_exceptions.reserve(load_field_futures.size());
|
||||||
for (auto& future : load_field_futures) {
|
for (auto& future : load_field_futures) {
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
|
|||||||
@ -1344,6 +1344,10 @@ SegmentGrowingImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
|
|
||||||
// Process each binlog
|
// Process each binlog
|
||||||
int64_t total_row_count = 0;
|
int64_t total_row_count = 0;
|
||||||
|
auto binlog_count = field_binlog.binlogs().size();
|
||||||
|
binlog_info.entries_nums.reserve(binlog_count);
|
||||||
|
binlog_info.insert_files.reserve(binlog_count);
|
||||||
|
binlog_info.memory_sizes.reserve(binlog_count);
|
||||||
for (const auto& binlog : field_binlog.binlogs()) {
|
for (const auto& binlog : field_binlog.binlogs()) {
|
||||||
binlog_info.entries_nums.push_back(binlog.entries_num());
|
binlog_info.entries_nums.push_back(binlog.entries_num());
|
||||||
binlog_info.insert_files.push_back(binlog.log_path());
|
binlog_info.insert_files.push_back(binlog.log_path());
|
||||||
@ -1353,6 +1357,7 @@ SegmentGrowingImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
|||||||
binlog_info.row_count = total_row_count;
|
binlog_info.row_count = total_row_count;
|
||||||
|
|
||||||
// Set child field ids
|
// Set child field ids
|
||||||
|
binlog_info.child_field_ids.reserve(field_binlog.child_fields().size());
|
||||||
for (const auto& child_field : field_binlog.child_fields()) {
|
for (const auto& child_field : field_binlog.child_fields()) {
|
||||||
binlog_info.child_field_ids.push_back(child_field);
|
binlog_info.child_field_ids.push_back(child_field);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,9 +23,11 @@ TimestampIndex::build_with(const Timestamp* timestamps, int64_t size) {
|
|||||||
auto num_slice = lengths_.size();
|
auto num_slice = lengths_.size();
|
||||||
Assert(num_slice > 0);
|
Assert(num_slice > 0);
|
||||||
std::vector<int64_t> prefix_sums;
|
std::vector<int64_t> prefix_sums;
|
||||||
|
prefix_sums.reserve(num_slice + 1);
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
prefix_sums.push_back(offset);
|
prefix_sums.push_back(offset);
|
||||||
std::vector<Timestamp> timestamp_barriers;
|
std::vector<Timestamp> timestamp_barriers;
|
||||||
|
timestamp_barriers.reserve(num_slice + 1);
|
||||||
Timestamp last_max_v = 0;
|
Timestamp last_max_v = 0;
|
||||||
for (int slice_id = 0; slice_id < num_slice; ++slice_id) {
|
for (int slice_id = 0; slice_id < num_slice; ++slice_id) {
|
||||||
auto length = lengths_[slice_id];
|
auto length = lengths_[slice_id];
|
||||||
|
|||||||
@ -89,6 +89,8 @@ GroupChunkTranslator::GroupChunkTranslator(
|
|||||||
.GetArrowFileSystem();
|
.GetArrowFileSystem();
|
||||||
|
|
||||||
// Get row group metadata from files
|
// Get row group metadata from files
|
||||||
|
parquet_file_metadata_.reserve(insert_files_.size());
|
||||||
|
row_group_meta_list_.reserve(insert_files_.size());
|
||||||
for (const auto& file : insert_files_) {
|
for (const auto& file : insert_files_) {
|
||||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||||
fs,
|
fs,
|
||||||
@ -114,11 +116,16 @@ GroupChunkTranslator::GroupChunkTranslator(
|
|||||||
file_row_group_prefix_sum_.reserve(row_group_meta_list_.size() + 1);
|
file_row_group_prefix_sum_.reserve(row_group_meta_list_.size() + 1);
|
||||||
file_row_group_prefix_sum_.push_back(
|
file_row_group_prefix_sum_.push_back(
|
||||||
0); // Base case: 0 row groups before first file
|
0); // Base case: 0 row groups before first file
|
||||||
|
size_t total_row_groups = 0;
|
||||||
for (const auto& file_metas : row_group_meta_list_) {
|
for (const auto& file_metas : row_group_meta_list_) {
|
||||||
|
total_row_groups += file_metas.size();
|
||||||
file_row_group_prefix_sum_.push_back(file_row_group_prefix_sum_.back() +
|
file_row_group_prefix_sum_.push_back(file_row_group_prefix_sum_.back() +
|
||||||
file_metas.size());
|
file_metas.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
meta_.num_rows_until_chunk_.reserve(total_row_groups + 1);
|
||||||
|
meta_.chunk_memory_size_.reserve(total_row_groups);
|
||||||
|
|
||||||
meta_.num_rows_until_chunk_.push_back(0);
|
meta_.num_rows_until_chunk_.push_back(0);
|
||||||
for (const auto& row_group_meta : row_group_meta_list_) {
|
for (const auto& row_group_meta : row_group_meta_list_) {
|
||||||
for (int i = 0; i < row_group_meta.size(); ++i) {
|
for (int i = 0; i < row_group_meta.size(); ++i) {
|
||||||
|
|||||||
@ -355,6 +355,7 @@ struct TantivyIndexWrapper {
|
|||||||
int64_t offset_begin) {
|
int64_t offset_begin) {
|
||||||
assert(!finished_);
|
assert(!finished_);
|
||||||
std::vector<const char*> views;
|
std::vector<const char*> views;
|
||||||
|
views.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; i++) {
|
for (uintptr_t i = 0; i < len; i++) {
|
||||||
views.push_back(array[i].c_str());
|
views.push_back(array[i].c_str());
|
||||||
}
|
}
|
||||||
@ -435,6 +436,7 @@ struct TantivyIndexWrapper {
|
|||||||
|
|
||||||
if constexpr (std::is_same_v<T, std::string>) {
|
if constexpr (std::is_same_v<T, std::string>) {
|
||||||
std::vector<const char*> views;
|
std::vector<const char*> views;
|
||||||
|
views.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; i++) {
|
for (uintptr_t i = 0; i < len; i++) {
|
||||||
views.push_back(array[i].c_str());
|
views.push_back(array[i].c_str());
|
||||||
}
|
}
|
||||||
@ -621,6 +623,7 @@ struct TantivyIndexWrapper {
|
|||||||
|
|
||||||
if constexpr (std::is_same_v<T, std::string>) {
|
if constexpr (std::is_same_v<T, std::string>) {
|
||||||
std::vector<const char*> views;
|
std::vector<const char*> views;
|
||||||
|
views.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; i++) {
|
for (uintptr_t i = 0; i < len; i++) {
|
||||||
views.push_back(array[i].c_str());
|
views.push_back(array[i].c_str());
|
||||||
}
|
}
|
||||||
@ -709,6 +712,7 @@ struct TantivyIndexWrapper {
|
|||||||
} else {
|
} else {
|
||||||
// smaller integer should be converted first
|
// smaller integer should be converted first
|
||||||
std::vector<int64_t> buf(len);
|
std::vector<int64_t> buf(len);
|
||||||
|
buf.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; ++i) {
|
for (uintptr_t i = 0; i < len; ++i) {
|
||||||
buf[i] = static_cast<int64_t>(terms[i]);
|
buf[i] = static_cast<int64_t>(terms[i]);
|
||||||
}
|
}
|
||||||
@ -726,6 +730,7 @@ struct TantivyIndexWrapper {
|
|||||||
bitset);
|
bitset);
|
||||||
} else {
|
} else {
|
||||||
std::vector<double> buf(len);
|
std::vector<double> buf(len);
|
||||||
|
buf.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; ++i) {
|
for (uintptr_t i = 0; i < len; ++i) {
|
||||||
buf[i] = static_cast<double>(terms[i]);
|
buf[i] = static_cast<double>(terms[i]);
|
||||||
}
|
}
|
||||||
@ -736,6 +741,7 @@ struct TantivyIndexWrapper {
|
|||||||
|
|
||||||
if constexpr (std::is_same_v<T, std::string>) {
|
if constexpr (std::is_same_v<T, std::string>) {
|
||||||
std::vector<const char*> views;
|
std::vector<const char*> views;
|
||||||
|
views.reserve(len);
|
||||||
for (uintptr_t i = 0; i < len; i++) {
|
for (uintptr_t i = 0; i < len; i++) {
|
||||||
views.push_back(terms[i].c_str());
|
views.push_back(terms[i].c_str());
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user