enhance: Unify segment Load and Reopen through diff-based loading (#46536)

Related to #46358

Refactor segment loading to use a unified diff-based approach for both
initial Load and Reopen operations:

- Extract ApplyLoadDiff from Reopen to share loading logic
- Add GetLoadDiff to compute diff from empty state for initial load
- Change column_groups_to_load from map to vector<pair> to preserve
order
- Add validation for empty index file paths in diff computation
- Add comprehensive unit tests for GetLoadDiff

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Performance**
* Improved segment loading efficiency through incremental updates,
reducing memory overhead and enhancing performance during data updates.

* **Tests**
  * Expanded test coverage for load operation scenarios.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-12-24 10:19:22 +08:00 committed by GitHub
parent 3b599441fd
commit 48f8b3b585
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 302 additions and 72 deletions

View File

@ -2629,7 +2629,14 @@ ChunkedSegmentSealedImpl::Reopen(
// compute load diff
auto diff = current.ComputeDiff(new_seg_load_info);
ApplyLoadDiff(new_seg_load_info, diff);
LOG_INFO("Reopen segment {} done", id_);
}
void
ChunkedSegmentSealedImpl::ApplyLoadDiff(SegmentLoadInfo& segment_load_info,
LoadDiff& diff) {
milvus::tracer::TraceContext trace_ctx;
if (!diff.indexes_to_load.empty()) {
LoadBatchIndexes(trace_ctx, diff.indexes_to_load);
@ -2647,9 +2654,11 @@ ChunkedSegmentSealedImpl::Reopen(
auto properties =
milvus::storage::LoonFFIPropertiesSingleton::GetInstance()
.GetProperties();
LoadColumnGroups(new_seg_load_info.GetColumnGroups(),
properties,
diff.column_groups_to_load);
auto column_groups = segment_load_info.GetColumnGroups();
auto arrow_schema = schema_->ConvertToArrowSchema();
reader_ = milvus_storage::api::Reader::create(
column_groups, arrow_schema, nullptr, *properties);
LoadColumnGroups(column_groups, properties, diff.column_groups_to_load);
}
// load field binlog
@ -2663,8 +2672,6 @@ ChunkedSegmentSealedImpl::Reopen(
DropFieldData(field_id);
}
}
LOG_INFO("Reopen segment {} done", id_);
}
void
@ -2794,7 +2801,7 @@ ChunkedSegmentSealedImpl::LoadManifest(const std::string& manifest_path) {
reader_ = milvus_storage::api::Reader::create(
column_groups, arrow_schema, nullptr, *properties);
std::map<int, std::vector<FieldId>> cg_field_ids_map;
std::vector<std::pair<int, std::vector<FieldId>>> cg_field_ids;
for (int i = 0; i < column_groups->size(); ++i) {
auto column_group = column_groups->get_column_group(i);
std::vector<FieldId> milvus_field_ids;
@ -2802,22 +2809,22 @@ ChunkedSegmentSealedImpl::LoadManifest(const std::string& manifest_path) {
auto field_id = std::stoll(column);
milvus_field_ids.emplace_back(field_id);
}
cg_field_ids_map[i] = milvus_field_ids;
cg_field_ids.emplace_back(i, std::move(milvus_field_ids));
}
LoadColumnGroups(column_groups, properties, cg_field_ids_map);
LoadColumnGroups(column_groups, properties, cg_field_ids);
}
void
ChunkedSegmentSealedImpl::LoadColumnGroups(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
std::map<int, std::vector<FieldId>>& cg_field_ids_map) {
std::vector<std::pair<int, std::vector<FieldId>>>& cg_field_ids) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW);
std::vector<std::future<void>> load_group_futures;
for (const auto& kv : cg_field_ids_map) {
auto cg_index = kv.first;
const auto& field_ids = kv.second;
for (const auto& pair : cg_field_ids) {
auto cg_index = pair.first;
const auto& field_ids = pair.second;
auto future =
pool.Submit([this, column_groups, properties, cg_index, field_ids] {
LoadColumnGroup(column_groups, properties, cg_index, field_ids);
@ -3172,62 +3179,8 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
auto num_rows = segment_load_info_.GetNumOfRows();
LOG_INFO("Loading segment {} with {} rows", id_, num_rows);
// Step 1: Separate indexed and non-indexed fields
std::map<FieldId, std::vector<const proto::segcore::FieldIndexInfo*>>
field_id_to_index_info;
std::set<FieldId> indexed_fields;
for (int i = 0; i < segment_load_info_.GetIndexInfoCount(); i++) {
const auto& index_info = segment_load_info_.GetIndexInfo(i);
if (index_info.index_file_paths_size() == 0) {
continue;
}
auto field_id = FieldId(index_info.fieldid());
field_id_to_index_info[field_id].push_back(&index_info);
indexed_fields.insert(field_id);
}
// Step 2: Load indexes in parallel using thread pool
LoadBatchIndexes(trace_ctx, field_id_to_index_info);
LOG_INFO("Finished loading {} indexes for segment {}",
field_id_to_index_info.size(),
id_);
// Step 3.a: Load with manifest
auto manifest_path = segment_load_info_.GetManifestPath();
if (manifest_path != "") {
LoadManifest(manifest_path);
return;
}
// Step 3.b: Load with field binlog
std::vector<std::pair<std::vector<FieldId>, proto::segcore::FieldBinlog>>
field_binlog_to_load;
for (int i = 0; i < segment_load_info_.GetBinlogPathCount(); i++) {
LoadFieldDataInfo load_field_data_info;
load_field_data_info.storage_version =
segment_load_info_.GetStorageVersion();
const auto& field_binlog = segment_load_info_.GetBinlogPath(i);
std::vector<FieldId> field_ids;
// when child fields specified, field id is group id, child field ids are actual id values here
if (field_binlog.child_fields_size() > 0) {
field_ids.reserve(field_binlog.child_fields_size());
for (auto field_id : field_binlog.child_fields()) {
field_ids.emplace_back(field_id);
}
} else {
field_ids.emplace_back(field_binlog.fieldid());
}
field_binlog_to_load.emplace_back(field_ids, field_binlog);
}
if (!field_binlog_to_load.empty()) {
LoadBatchFieldData(trace_ctx, field_binlog_to_load);
}
auto diff = segment_load_info_.GetLoadDiff();
ApplyLoadDiff(segment_load_info_, diff);
LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows);
}

View File

@ -987,7 +987,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
LoadColumnGroups(
const std::shared_ptr<milvus_storage::api::ColumnGroups>& column_groups,
const std::shared_ptr<milvus_storage::api::Properties>& properties,
std::map<int, std::vector<FieldId>>& cg_field_ids_map);
std::vector<std::pair<int, std::vector<FieldId>>>& cg_field_ids);
/**
* @brief Load a single column group at the specified index
@ -1006,6 +1006,19 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
int64_t index,
const std::vector<FieldId>& milvus_field_ids);
/**
* @brief Apply load differences to update segment load information
*
* This method processes the differences between current and new load states,
* updating the segment's loaded fields and indexes accordingly. It handles
* incremental updates during segment reopen operations.
*
* @param segment_load_info The segment load information to be updated
* @param load_diff The differences to apply, containing fields and indexes to add/remove
*/
void
ApplyLoadDiff(SegmentLoadInfo& segment_load_info, LoadDiff& load_diff);
void
load_field_data_common(
FieldId field_id,

View File

@ -13,8 +13,10 @@
#include <algorithm>
#include <cctype>
#include <memory>
#include "common/FieldMeta.h"
#include "milvus-storage/column_groups.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/loon_ffi/property_singleton.h"
#include "storage/MmapManager.h"
@ -152,12 +154,18 @@ SegmentLoadInfo::ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info) {
// Get current indexed field IDs
std::set<int64_t> current_index_ids;
for (auto const& index_info : GetIndexInfos()) {
if (index_info.index_file_paths_size() == 0) {
continue;
}
current_index_ids.insert(index_info.indexid());
}
std::set<int64_t> new_index_ids;
// Find indexes to load: indexes in new_info but not in current
for (const auto& new_index_info : new_info.GetIndexInfos()) {
if (new_index_info.index_file_paths_size() == 0) {
continue;
}
new_index_ids.insert(new_index_info.indexid());
if (current_index_ids.find(new_index_info.indexid()) ==
current_index_ids.end()) {
@ -168,6 +176,9 @@ SegmentLoadInfo::ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info) {
// Find indexes to drop: fields that have indexes in current but not in new_info
for (const auto& index_info : GetIndexInfos()) {
if (index_info.index_file_paths_size() == 0) {
continue;
}
if (new_index_ids.find(index_info.indexid()) == new_index_ids.end()) {
diff.indexes_to_drop.insert(FieldId(index_info.fieldid()));
}
@ -234,6 +245,7 @@ SegmentLoadInfo::ComputeDiffColumnGroups(LoadDiff& diff,
std::map<int64_t, int> new_field_ids;
for (int i = 0; i < new_column_group->size(); i++) {
auto cg = new_column_group->get_column_group(i);
std::vector<FieldId> fields;
for (const auto& column : cg->columns) {
auto field_id = std::stoll(column);
new_field_ids.emplace(field_id, i);
@ -241,9 +253,12 @@ SegmentLoadInfo::ComputeDiffColumnGroups(LoadDiff& diff,
auto iter = cur_field_ids.find(field_id);
// If this field doesn't exist in current, mark the column group for loading
if (iter == cur_field_ids.end() || iter->second != i) {
diff.column_groups_to_load[i].emplace_back(field_id);
fields.emplace_back(field_id);
}
}
if (!fields.empty()) {
diff.column_groups_to_load.emplace_back(i, fields);
}
}
// Find field data to drop: fields in current but not in new
@ -280,4 +295,31 @@ SegmentLoadInfo::ComputeDiff(SegmentLoadInfo& new_info) {
return diff;
}
LoadDiff
SegmentLoadInfo::GetLoadDiff() {
LoadDiff diff;
SegmentLoadInfo empty_info;
// Handle index changes
empty_info.ComputeDiffIndexes(diff, *this);
// Handle field data changes
// Note: Updates can only happen within the same category:
// - binlog -> binlog
// - manifest -> manifest
// Cross-category changes are not supported.
if (HasManifestPath()) {
// set mock path for null check
empty_info.info_.set_manifest_path("mocked manifest path");
empty_info.column_groups_ =
std::make_shared<milvus_storage::api::ColumnGroups>();
empty_info.ComputeDiffColumnGroups(diff, *this);
} else {
empty_info.ComputeDiffBinlogs(diff, *this);
}
return diff;
}
} // namespace milvus::segcore

View File

@ -44,8 +44,9 @@ struct LoadDiff {
std::vector<std::pair<std::vector<FieldId>, proto::segcore::FieldBinlog>>
binlogs_to_load;
// Field id to column group index to be loaded
std::map<int, std::vector<FieldId>> column_groups_to_load;
// list of column group indices and related field ids to load
// same index could appear multiple times if same group using different setups
std::vector<std::pair<int, std::vector<FieldId>>> column_groups_to_load;
// Indexes that need to be dropped (field_id set)
std::set<FieldId> indexes_to_drop;
@ -577,6 +578,17 @@ class SegmentLoadInfo {
[[nodiscard]] LoadDiff
ComputeDiff(SegmentLoadInfo& new_info);
/**
* @brief Get the LoadDiff from the current SegmentLoadInfo
*
* This method produces a LoadDiff that describes what needs to be loaded when
* load with current load info.
*
* @return LoadDiff containing the differences
*/
[[nodiscard]] LoadDiff
GetLoadDiff();
// ==================== Underlying Proto Access ====================
/**

View File

@ -326,3 +326,213 @@ TEST_F(SegmentLoadInfoTest, IndexWithoutFiles) {
EXPECT_FALSE(info.HasIndexInfo(FieldId(101)));
EXPECT_EQ(info.GetIndexInfoCount(), 1); // Proto still has it
}
// ==================== GetLoadDiff Tests ====================
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithEmptyInfo) {
// Empty SegmentLoadInfo should return empty diff
SegmentLoadInfo empty_info;
auto diff = empty_info.GetLoadDiff();
EXPECT_FALSE(diff.HasChanges());
EXPECT_TRUE(diff.indexes_to_load.empty());
EXPECT_TRUE(diff.binlogs_to_load.empty());
EXPECT_TRUE(diff.column_groups_to_load.empty());
EXPECT_TRUE(diff.indexes_to_drop.empty());
EXPECT_TRUE(diff.field_data_to_drop.empty());
EXPECT_FALSE(diff.manifest_updated);
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesOnly) {
// Create info with only indexes (no binlogs, no manifest)
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add two indexes
auto* index1 = test_proto.add_index_infos();
index1->set_fieldid(101);
index1->set_indexid(1001);
index1->add_index_file_paths("/path/to/index1");
auto* index2 = test_proto.add_index_infos();
index2->set_fieldid(102);
index2->set_indexid(1002);
index2->add_index_file_paths("/path/to/index2");
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Both indexes should be in indexes_to_load
EXPECT_EQ(diff.indexes_to_load.size(), 2);
EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0);
EXPECT_TRUE(diff.indexes_to_load.count(FieldId(102)) > 0);
EXPECT_EQ(diff.indexes_to_load[FieldId(101)].size(), 1);
EXPECT_EQ(diff.indexes_to_load[FieldId(102)].size(), 1);
// No drops or other changes
EXPECT_TRUE(diff.indexes_to_drop.empty());
EXPECT_TRUE(diff.binlogs_to_load.empty());
EXPECT_TRUE(diff.field_data_to_drop.empty());
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsOnly) {
// Create info with only binlogs (no indexes, no manifest)
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add binlog with child fields
auto* binlog = test_proto.add_binlog_paths();
binlog->set_fieldid(200);
binlog->add_child_fields(201);
binlog->add_child_fields(202);
auto* log = binlog->add_binlogs();
log->set_log_path("/path/to/binlog");
log->set_entries_num(500);
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Binlogs should be in binlogs_to_load
EXPECT_EQ(diff.binlogs_to_load.size(), 1);
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 2); // 2 child fields
EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 201);
EXPECT_EQ(diff.binlogs_to_load[0].first[1].get(), 202);
// No index changes
EXPECT_TRUE(diff.indexes_to_load.empty());
EXPECT_TRUE(diff.indexes_to_drop.empty());
EXPECT_TRUE(diff.field_data_to_drop.empty());
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesAndBinlogs) {
// Create info with both indexes and binlogs
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add index
auto* index = test_proto.add_index_infos();
index->set_fieldid(101);
index->set_indexid(1001);
index->add_index_file_paths("/path/to/index");
// Add binlog with child fields
auto* binlog = test_proto.add_binlog_paths();
binlog->set_fieldid(200);
binlog->add_child_fields(201);
auto* log = binlog->add_binlogs();
log->set_log_path("/path/to/binlog");
log->set_entries_num(500);
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Index should be in indexes_to_load
EXPECT_EQ(diff.indexes_to_load.size(), 1);
EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0);
// Binlog should be in binlogs_to_load
EXPECT_EQ(diff.binlogs_to_load.size(), 1);
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1);
EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 201);
// No drops
EXPECT_TRUE(diff.indexes_to_drop.empty());
EXPECT_TRUE(diff.field_data_to_drop.empty());
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffIgnoresIndexesWithoutFiles) {
// Indexes without files should be ignored in GetLoadDiff
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add index with files
auto* index1 = test_proto.add_index_infos();
index1->set_fieldid(101);
index1->set_indexid(1001);
index1->add_index_file_paths("/path/to/index");
// Add index without files - should be ignored
auto* index2 = test_proto.add_index_infos();
index2->set_fieldid(102);
index2->set_indexid(1002);
// No index_file_paths added
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Only index with files should be in indexes_to_load
EXPECT_EQ(diff.indexes_to_load.size(), 1);
EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0);
EXPECT_FALSE(diff.indexes_to_load.count(FieldId(102)) > 0);
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleIndexesPerField) {
// A field can have multiple indexes (e.g., JSON field with multiple paths)
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add two indexes for the same field
auto* index1 = test_proto.add_index_infos();
index1->set_fieldid(101);
index1->set_indexid(1001);
index1->add_index_file_paths("/path/to/index1");
auto* index2 = test_proto.add_index_infos();
index2->set_fieldid(101);
index2->set_indexid(1002);
index2->add_index_file_paths("/path/to/index2");
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Both indexes should be in indexes_to_load for the same field
EXPECT_EQ(diff.indexes_to_load.size(), 1);
EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0);
EXPECT_EQ(diff.indexes_to_load[FieldId(101)].size(), 2);
}
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogGroups) {
// Test with multiple binlog groups
proto::segcore::SegmentLoadInfo test_proto;
test_proto.set_segmentid(100);
test_proto.set_num_of_rows(1000);
// Add first binlog group
auto* binlog1 = test_proto.add_binlog_paths();
binlog1->set_fieldid(200);
binlog1->add_child_fields(201);
binlog1->add_child_fields(202);
auto* log1 = binlog1->add_binlogs();
log1->set_log_path("/path/to/binlog1");
log1->set_entries_num(500);
// Add second binlog group
auto* binlog2 = test_proto.add_binlog_paths();
binlog2->set_fieldid(300);
binlog2->add_child_fields(301);
auto* log2 = binlog2->add_binlogs();
log2->set_log_path("/path/to/binlog2");
log2->set_entries_num(500);
SegmentLoadInfo info(test_proto);
auto diff = info.GetLoadDiff();
EXPECT_TRUE(diff.HasChanges());
// Both binlog groups should be in binlogs_to_load
EXPECT_EQ(diff.binlogs_to_load.size(), 2);
// Check first group
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 2);
// Check second group
EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1);
}