mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 14:35:27 +08:00
fix: Handle legacy binlog format (v1) in segment load diff computation (#46598)
When computing load diff, binlogs in v1/legacy format have empty child_fields. In this case, the field_id itself should be used as the child_id (group_id == field_id for legacy format). Without this fix, legacy format binlogs are not recognized during diff computation, causing segments to fail loading and TestProxy to timeout. Changes: - Add fallback to use fieldid as child_id when child_fields is empty - Add LoadDiff::ToString() for debugging - Add logging for diff in Load/Reopen operations - Add comprehensive unit tests for legacy format handling Related to #46594 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: load-diff computation must enumerate every binlog child group for a field so current vs new segment state comparisons include all column-group/binlog groups; for legacy (v1) binlogs that have empty child_fields, the code must treat group_id == field_id to preserve that mapping. - Bug fix (resolves #46594): SegmentLoadInfo now normalizes field_binlog.child_fields() into a vector and falls back to using field_id as the single child group when child_fields is empty; the same normalization is applied for both current and new-info paths, ensuring legacy v1 binlogs are discovered and included in Load/ComputeDiff results so segments load correctly. - Logic simplified: removed the implicit assumption that child_fields is always present by centralizing a single normalization/fallback step used symmetrically for both diff paths, avoiding ad-hoc special-casing and unifying iteration over child groups. - No data loss / no behavior regression: the fallback only activates when child_fields is empty — non-legacy binlogs continue to use their child_fields unchanged. Add/drop semantics are preserved because the same normalization is applied to both sides of the diff. Unit tests (v1-only, v4-only, mixed cases) were added to validate correctness; LoadDiff::ToString() and extra logging are diagnostic only. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Co-authored-by: Cai Zhang <cai.zhang@zilliz.com> --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
85486df8c9
commit
6f94d8c41a
@ -2629,6 +2629,7 @@ ChunkedSegmentSealedImpl::Reopen(
|
||||
|
||||
// compute load diff
|
||||
auto diff = current.ComputeDiff(new_seg_load_info);
|
||||
LOG_INFO("Reopen segment {} with diff {}", id_, diff.ToString());
|
||||
ApplyLoadDiff(new_seg_load_info, diff);
|
||||
|
||||
LOG_INFO("Reopen segment {} done", id_);
|
||||
@ -3180,6 +3181,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) {
|
||||
LOG_INFO("Loading segment {} with {} rows", id_, num_rows);
|
||||
|
||||
auto diff = segment_load_info_.GetLoadDiff();
|
||||
LOG_WARN("Load segment {} with diff {}", id_, diff.ToString());
|
||||
ApplyLoadDiff(segment_load_info_, diff);
|
||||
|
||||
LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows);
|
||||
|
||||
@ -191,7 +191,14 @@ SegmentLoadInfo::ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info) {
|
||||
std::map<int64_t, int64_t> current_fields;
|
||||
for (int i = 0; i < GetBinlogPathCount(); i++) {
|
||||
auto& field_binlog = GetBinlogPath(i);
|
||||
for (auto child_id : field_binlog.child_fields()) {
|
||||
std::vector<int64_t> child_fields(field_binlog.child_fields().begin(),
|
||||
field_binlog.child_fields().end());
|
||||
// v1 or legacy, group id == field id
|
||||
if (child_fields.empty()) {
|
||||
child_fields.emplace_back(field_binlog.fieldid());
|
||||
}
|
||||
|
||||
for (auto child_id : child_fields) {
|
||||
current_fields[child_id] = field_binlog.fieldid();
|
||||
}
|
||||
}
|
||||
@ -200,7 +207,14 @@ SegmentLoadInfo::ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info) {
|
||||
for (int i = 0; i < new_info.GetBinlogPathCount(); i++) {
|
||||
auto& new_field_binlog = new_info.GetBinlogPath(i);
|
||||
std::vector<FieldId> ids_to_load;
|
||||
for (auto child_id : new_field_binlog.child_fields()) {
|
||||
std::vector<int64_t> child_fields(
|
||||
new_field_binlog.child_fields().begin(),
|
||||
new_field_binlog.child_fields().end());
|
||||
// v1 or legacy, group id == field id
|
||||
if (child_fields.empty()) {
|
||||
child_fields.emplace_back(new_field_binlog.fieldid());
|
||||
}
|
||||
for (auto child_id : child_fields) {
|
||||
new_binlog_fields[child_id] = new_field_binlog.fieldid();
|
||||
auto iter = current_fields.find(new_field_binlog.fieldid());
|
||||
// Find binlogs to load: fields in new_info match current
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
@ -72,6 +73,88 @@ struct LoadDiff {
|
||||
HasManifestChange() const {
|
||||
return manifest_updated;
|
||||
}
|
||||
|
||||
[[nodiscard]] std::string
|
||||
ToString() const {
|
||||
std::ostringstream oss;
|
||||
oss << "LoadDiff{";
|
||||
|
||||
// indexes_to_load
|
||||
oss << "indexes_to_load=[";
|
||||
bool first = true;
|
||||
for (const auto& [field_id, infos] : indexes_to_load) {
|
||||
if (!first)
|
||||
oss << ", ";
|
||||
first = false;
|
||||
oss << field_id.get() << ":" << infos.size() << " indexes";
|
||||
}
|
||||
oss << "], ";
|
||||
|
||||
// binlogs_to_load
|
||||
oss << "binlogs_to_load=[";
|
||||
first = true;
|
||||
for (const auto& [field_ids, binlog] : binlogs_to_load) {
|
||||
if (!first)
|
||||
oss << ", ";
|
||||
first = false;
|
||||
oss << "[";
|
||||
for (size_t i = 0; i < field_ids.size(); ++i) {
|
||||
if (i > 0)
|
||||
oss << ",";
|
||||
oss << field_ids[i].get();
|
||||
}
|
||||
oss << "]";
|
||||
}
|
||||
oss << "], ";
|
||||
|
||||
// column_groups_to_load
|
||||
oss << "column_groups_to_load=[";
|
||||
first = true;
|
||||
for (const auto& [group_idx, field_ids] : column_groups_to_load) {
|
||||
if (!first)
|
||||
oss << ", ";
|
||||
first = false;
|
||||
oss << "group" << group_idx << ":[";
|
||||
for (size_t i = 0; i < field_ids.size(); ++i) {
|
||||
if (i > 0)
|
||||
oss << ",";
|
||||
oss << field_ids[i].get();
|
||||
}
|
||||
oss << "]";
|
||||
}
|
||||
oss << "], ";
|
||||
|
||||
// indexes_to_drop
|
||||
oss << "indexes_to_drop=[";
|
||||
first = true;
|
||||
for (const auto& field_id : indexes_to_drop) {
|
||||
if (!first)
|
||||
oss << ", ";
|
||||
first = false;
|
||||
oss << field_id.get();
|
||||
}
|
||||
oss << "], ";
|
||||
|
||||
// field_data_to_drop
|
||||
oss << "field_data_to_drop=[";
|
||||
first = true;
|
||||
for (const auto& field_id : field_data_to_drop) {
|
||||
if (!first)
|
||||
oss << ", ";
|
||||
first = false;
|
||||
oss << field_id.get();
|
||||
}
|
||||
oss << "], ";
|
||||
|
||||
// manifest_updated and new_manifest_path
|
||||
oss << "manifest_updated=" << (manifest_updated ? "true" : "false");
|
||||
if (manifest_updated) {
|
||||
oss << ", new_manifest_path=" << new_manifest_path;
|
||||
}
|
||||
|
||||
oss << "}";
|
||||
return oss.str();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -362,6 +362,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesOnly) {
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
std::cout << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Both indexes should be in indexes_to_load
|
||||
@ -394,6 +395,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsOnly) {
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
std::cout << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Binlogs should be in binlogs_to_load
|
||||
@ -430,6 +432,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesAndBinlogs) {
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
std::cout << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Index should be in indexes_to_load
|
||||
@ -466,6 +469,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffIgnoresIndexesWithoutFiles) {
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
std::cout << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Only index with files should be in indexes_to_load
|
||||
@ -536,3 +540,241 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogGroups) {
|
||||
// Check second group
|
||||
EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1);
|
||||
}
|
||||
|
||||
// ==================== Legacy Format (v1) Tests ====================
|
||||
// Test binlogs without child_fields (v1/legacy format where group_id == field_id)
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsLegacyFormat) {
|
||||
// Create info with binlogs without child_fields (legacy/v1 format)
|
||||
proto::segcore::SegmentLoadInfo test_proto;
|
||||
test_proto.set_segmentid(100);
|
||||
test_proto.set_num_of_rows(1000);
|
||||
|
||||
// Add binlog WITHOUT child_fields - this is the legacy format
|
||||
auto* binlog = test_proto.add_binlog_paths();
|
||||
binlog->set_fieldid(101);
|
||||
// Note: no child_fields added - this triggers the legacy handling
|
||||
auto* log = binlog->add_binlogs();
|
||||
log->set_log_path("/path/to/binlog");
|
||||
log->set_entries_num(500);
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
std::cout << "Legacy format diff: " << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// In legacy format, field_id itself is used as the child_id
|
||||
EXPECT_EQ(diff.binlogs_to_load.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(),
|
||||
101); // field_id as child_id
|
||||
|
||||
// No index changes
|
||||
EXPECT_TRUE(diff.indexes_to_load.empty());
|
||||
EXPECT_TRUE(diff.indexes_to_drop.empty());
|
||||
EXPECT_TRUE(diff.field_data_to_drop.empty());
|
||||
}
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogsLegacyFormat) {
|
||||
// Test multiple binlogs in legacy format
|
||||
proto::segcore::SegmentLoadInfo test_proto;
|
||||
test_proto.set_segmentid(100);
|
||||
test_proto.set_num_of_rows(1000);
|
||||
|
||||
// Add first binlog without child_fields
|
||||
auto* binlog1 = test_proto.add_binlog_paths();
|
||||
binlog1->set_fieldid(101);
|
||||
auto* log1 = binlog1->add_binlogs();
|
||||
log1->set_log_path("/path/to/binlog1");
|
||||
log1->set_entries_num(500);
|
||||
|
||||
// Add second binlog without child_fields
|
||||
auto* binlog2 = test_proto.add_binlog_paths();
|
||||
binlog2->set_fieldid(102);
|
||||
auto* log2 = binlog2->add_binlogs();
|
||||
log2->set_log_path("/path/to/binlog2");
|
||||
log2->set_entries_num(500);
|
||||
|
||||
SegmentLoadInfo info(test_proto);
|
||||
auto diff = info.GetLoadDiff();
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
EXPECT_EQ(diff.binlogs_to_load.size(), 2);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 101);
|
||||
EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[1].first[0].get(), 102);
|
||||
}
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, ComputeDiffWithBinlogsLegacyFormat) {
|
||||
// Test ComputeDiff between two SegmentLoadInfos using legacy format
|
||||
|
||||
// Current: has field 101
|
||||
proto::segcore::SegmentLoadInfo current_proto;
|
||||
current_proto.set_segmentid(100);
|
||||
current_proto.set_num_of_rows(1000);
|
||||
auto* binlog1 = current_proto.add_binlog_paths();
|
||||
binlog1->set_fieldid(101);
|
||||
auto* log1 = binlog1->add_binlogs();
|
||||
log1->set_log_path("/path/to/binlog1");
|
||||
log1->set_entries_num(500);
|
||||
|
||||
// New: has field 101 and 102
|
||||
proto::segcore::SegmentLoadInfo new_proto;
|
||||
new_proto.set_segmentid(100);
|
||||
new_proto.set_num_of_rows(1000);
|
||||
auto* new_binlog1 = new_proto.add_binlog_paths();
|
||||
new_binlog1->set_fieldid(101);
|
||||
auto* new_log1 = new_binlog1->add_binlogs();
|
||||
new_log1->set_log_path("/path/to/binlog1");
|
||||
new_log1->set_entries_num(500);
|
||||
|
||||
auto* new_binlog2 = new_proto.add_binlog_paths();
|
||||
new_binlog2->set_fieldid(102);
|
||||
auto* new_log2 = new_binlog2->add_binlogs();
|
||||
new_log2->set_log_path("/path/to/binlog2");
|
||||
new_log2->set_entries_num(500);
|
||||
|
||||
SegmentLoadInfo current_info(current_proto);
|
||||
SegmentLoadInfo new_info(new_proto);
|
||||
auto diff = current_info.ComputeDiff(new_info);
|
||||
std::cout << "ComputeDiff legacy format: " << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Field 102 should be added
|
||||
EXPECT_EQ(diff.binlogs_to_load.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1);
|
||||
EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 102);
|
||||
|
||||
// No fields should be dropped
|
||||
EXPECT_TRUE(diff.field_data_to_drop.empty());
|
||||
}
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, ComputeDiffDropFieldLegacyFormat) {
|
||||
// Test ComputeDiff when dropping a field in legacy format
|
||||
|
||||
// Current: has fields 101 and 102
|
||||
proto::segcore::SegmentLoadInfo current_proto;
|
||||
current_proto.set_segmentid(100);
|
||||
current_proto.set_num_of_rows(1000);
|
||||
|
||||
auto* binlog1 = current_proto.add_binlog_paths();
|
||||
binlog1->set_fieldid(101);
|
||||
auto* log1 = binlog1->add_binlogs();
|
||||
log1->set_log_path("/path/to/binlog1");
|
||||
log1->set_entries_num(500);
|
||||
|
||||
auto* binlog2 = current_proto.add_binlog_paths();
|
||||
binlog2->set_fieldid(102);
|
||||
auto* log2 = binlog2->add_binlogs();
|
||||
log2->set_log_path("/path/to/binlog2");
|
||||
log2->set_entries_num(500);
|
||||
|
||||
// New: only has field 101 (102 is dropped)
|
||||
proto::segcore::SegmentLoadInfo new_proto;
|
||||
new_proto.set_segmentid(100);
|
||||
new_proto.set_num_of_rows(1000);
|
||||
|
||||
auto* new_binlog1 = new_proto.add_binlog_paths();
|
||||
new_binlog1->set_fieldid(101);
|
||||
auto* new_log1 = new_binlog1->add_binlogs();
|
||||
new_log1->set_log_path("/path/to/binlog1");
|
||||
new_log1->set_entries_num(500);
|
||||
|
||||
SegmentLoadInfo current_info(current_proto);
|
||||
SegmentLoadInfo new_info(new_proto);
|
||||
auto diff = current_info.ComputeDiff(new_info);
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// No new binlogs to load
|
||||
EXPECT_TRUE(diff.binlogs_to_load.empty());
|
||||
// Field 102 should be dropped
|
||||
EXPECT_EQ(diff.field_data_to_drop.size(), 1);
|
||||
EXPECT_TRUE(diff.field_data_to_drop.count(FieldId(102)) > 0);
|
||||
}
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, ComputeDiffMixedFormats) {
|
||||
// Test ComputeDiff with mixed formats: legacy and column group
|
||||
|
||||
// Current: has field 101 (legacy) and column group 200 with child fields 201, 202
|
||||
proto::segcore::SegmentLoadInfo current_proto;
|
||||
current_proto.set_segmentid(100);
|
||||
current_proto.set_num_of_rows(1000);
|
||||
|
||||
// Legacy format binlog
|
||||
auto* binlog1 = current_proto.add_binlog_paths();
|
||||
binlog1->set_fieldid(101);
|
||||
auto* log1 = binlog1->add_binlogs();
|
||||
log1->set_log_path("/path/to/binlog1");
|
||||
log1->set_entries_num(500);
|
||||
|
||||
// Column group format binlog
|
||||
auto* binlog2 = current_proto.add_binlog_paths();
|
||||
binlog2->set_fieldid(200);
|
||||
binlog2->add_child_fields(201);
|
||||
binlog2->add_child_fields(202);
|
||||
auto* log2 = binlog2->add_binlogs();
|
||||
log2->set_log_path("/path/to/group_binlog");
|
||||
log2->set_entries_num(1000);
|
||||
|
||||
// New: has all existing + new field 103 (legacy) + new field 203 in column group
|
||||
proto::segcore::SegmentLoadInfo new_proto;
|
||||
new_proto.set_segmentid(100);
|
||||
new_proto.set_num_of_rows(1000);
|
||||
|
||||
auto* new_binlog1 = new_proto.add_binlog_paths();
|
||||
new_binlog1->set_fieldid(101);
|
||||
auto* new_log1 = new_binlog1->add_binlogs();
|
||||
new_log1->set_log_path("/path/to/binlog1");
|
||||
new_log1->set_entries_num(500);
|
||||
|
||||
// New legacy field
|
||||
auto* new_binlog3 = new_proto.add_binlog_paths();
|
||||
new_binlog3->set_fieldid(103);
|
||||
auto* new_log3 = new_binlog3->add_binlogs();
|
||||
new_log3->set_log_path("/path/to/binlog3");
|
||||
new_log3->set_entries_num(500);
|
||||
|
||||
// Updated column group with additional field
|
||||
auto* new_binlog2 = new_proto.add_binlog_paths();
|
||||
new_binlog2->set_fieldid(200);
|
||||
new_binlog2->add_child_fields(201);
|
||||
new_binlog2->add_child_fields(202);
|
||||
new_binlog2->add_child_fields(203); // New field added to group
|
||||
auto* new_log2 = new_binlog2->add_binlogs();
|
||||
new_log2->set_log_path("/path/to/group_binlog_new");
|
||||
new_log2->set_entries_num(1500);
|
||||
|
||||
SegmentLoadInfo current_info(current_proto);
|
||||
SegmentLoadInfo new_info(new_proto);
|
||||
auto diff = current_info.ComputeDiff(new_info);
|
||||
std::cout << "Mixed format diff: " << diff.ToString() << "\n";
|
||||
|
||||
EXPECT_TRUE(diff.HasChanges());
|
||||
// Field 103 (legacy) and field 203 (in column group) should be loaded
|
||||
EXPECT_EQ(diff.binlogs_to_load.size(), 2);
|
||||
|
||||
// No fields should be dropped
|
||||
EXPECT_TRUE(diff.field_data_to_drop.empty());
|
||||
}
|
||||
|
||||
TEST_F(SegmentLoadInfoTest, ComputeDiffNoChangesLegacyFormat) {
|
||||
// Test ComputeDiff when there are no changes (same binlogs)
|
||||
proto::segcore::SegmentLoadInfo proto;
|
||||
proto.set_segmentid(100);
|
||||
proto.set_num_of_rows(1000);
|
||||
|
||||
auto* binlog = proto.add_binlog_paths();
|
||||
binlog->set_fieldid(101);
|
||||
auto* log = binlog->add_binlogs();
|
||||
log->set_log_path("/path/to/binlog");
|
||||
log->set_entries_num(500);
|
||||
|
||||
SegmentLoadInfo current_info(proto);
|
||||
SegmentLoadInfo new_info(proto);
|
||||
auto diff = current_info.ComputeDiff(new_info);
|
||||
|
||||
EXPECT_FALSE(diff.HasChanges());
|
||||
EXPECT_TRUE(diff.binlogs_to_load.empty());
|
||||
EXPECT_TRUE(diff.field_data_to_drop.empty());
|
||||
}
|
||||
|
||||
@ -87,6 +87,7 @@ func TestPauseConsumption(t *testing.T) {
|
||||
pendingQueue: utility.NewPendingQueue(),
|
||||
cleanup: func() {},
|
||||
ScannerHelper: helper.NewScannerHelper("test"),
|
||||
metrics: metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
@ -28,8 +28,8 @@ pytest-parallel
|
||||
pytest-random-order
|
||||
|
||||
# pymilvus
|
||||
pymilvus==2.7.0rc93
|
||||
pymilvus[bulk_writer]==2.7.0rc93
|
||||
pymilvus==2.7.0rc95
|
||||
pymilvus[bulk_writer]==2.7.0rc95
|
||||
# for protobuf
|
||||
protobuf>=5.29.5
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user