From 6f94d8c41a251c33550b1ff8365a1232d8c66676 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 25 Dec 2025 23:33:19 +0800 Subject: [PATCH] fix: Handle legacy binlog format (v1) in segment load diff computation (#46598) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When computing load diff, binlogs in v1/legacy format have empty child_fields. In this case, the field_id itself should be used as the child_id (group_id == field_id for legacy format). Without this fix, legacy format binlogs are not recognized during diff computation, causing segments to fail loading and TestProxy to timeout. Changes: - Add fallback to use fieldid as child_id when child_fields is empty - Add LoadDiff::ToString() for debugging - Add logging for diff in Load/Reopen operations - Add comprehensive unit tests for legacy format handling Related to #46594 - Core invariant: load-diff computation must enumerate every binlog child group for a field so current vs new segment state comparisons include all column-group/binlog groups; for legacy (v1) binlogs that have empty child_fields, the code must treat group_id == field_id to preserve that mapping. - Bug fix (resolves #46594): SegmentLoadInfo now normalizes field_binlog.child_fields() into a vector and falls back to using field_id as the single child group when child_fields is empty; the same normalization is applied for both current and new-info paths, ensuring legacy v1 binlogs are discovered and included in Load/ComputeDiff results so segments load correctly. - Logic simplified: removed the implicit assumption that child_fields is always present by centralizing a single normalization/fallback step used symmetrically for both diff paths, avoiding ad-hoc special-casing and unifying iteration over child groups. - No data loss / no behavior regression: the fallback only activates when child_fields is empty — non-legacy binlogs continue to use their child_fields unchanged. Add/drop semantics are preserved because the same normalization is applied to both sides of the diff. Unit tests (v1-only, v4-only, mixed cases) were added to validate correctness; LoadDiff::ToString() and extra logging are diagnostic only. Co-authored-by: Cai Zhang --------- Signed-off-by: Congqi Xia --- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 2 + internal/core/src/segcore/SegmentLoadInfo.cpp | 18 +- internal/core/src/segcore/SegmentLoadInfo.h | 83 ++++++ .../core/src/segcore/SegmentLoadInfoTest.cpp | 242 ++++++++++++++++++ .../wal/adaptor/scanner_adaptor_test.go | 1 + tests/python_client/requirements.txt | 4 +- 6 files changed, 346 insertions(+), 4 deletions(-) diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index a023219bc8..dad2690103 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -2629,6 +2629,7 @@ ChunkedSegmentSealedImpl::Reopen( // compute load diff auto diff = current.ComputeDiff(new_seg_load_info); + LOG_INFO("Reopen segment {} with diff {}", id_, diff.ToString()); ApplyLoadDiff(new_seg_load_info, diff); LOG_INFO("Reopen segment {} done", id_); @@ -3180,6 +3181,7 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) { LOG_INFO("Loading segment {} with {} rows", id_, num_rows); auto diff = segment_load_info_.GetLoadDiff(); + LOG_WARN("Load segment {} with diff {}", id_, diff.ToString()); ApplyLoadDiff(segment_load_info_, diff); LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows); diff --git a/internal/core/src/segcore/SegmentLoadInfo.cpp b/internal/core/src/segcore/SegmentLoadInfo.cpp index 86b12d875a..3b5b3e912f 100644 --- a/internal/core/src/segcore/SegmentLoadInfo.cpp +++ b/internal/core/src/segcore/SegmentLoadInfo.cpp @@ -191,7 +191,14 @@ SegmentLoadInfo::ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info) { std::map current_fields; for (int i = 0; i < GetBinlogPathCount(); i++) { auto& field_binlog = GetBinlogPath(i); - for (auto child_id : field_binlog.child_fields()) { + std::vector child_fields(field_binlog.child_fields().begin(), + field_binlog.child_fields().end()); + // v1 or legacy, group id == field id + if (child_fields.empty()) { + child_fields.emplace_back(field_binlog.fieldid()); + } + + for (auto child_id : child_fields) { current_fields[child_id] = field_binlog.fieldid(); } } @@ -200,7 +207,14 @@ SegmentLoadInfo::ComputeDiffBinlogs(LoadDiff& diff, SegmentLoadInfo& new_info) { for (int i = 0; i < new_info.GetBinlogPathCount(); i++) { auto& new_field_binlog = new_info.GetBinlogPath(i); std::vector ids_to_load; - for (auto child_id : new_field_binlog.child_fields()) { + std::vector child_fields( + new_field_binlog.child_fields().begin(), + new_field_binlog.child_fields().end()); + // v1 or legacy, group id == field id + if (child_fields.empty()) { + child_fields.emplace_back(new_field_binlog.fieldid()); + } + for (auto child_id : child_fields) { new_binlog_fields[child_id] = new_field_binlog.fieldid(); auto iter = current_fields.find(new_field_binlog.fieldid()); // Find binlogs to load: fields in new_info match current diff --git a/internal/core/src/segcore/SegmentLoadInfo.h b/internal/core/src/segcore/SegmentLoadInfo.h index cd68615674..99b53481ac 100644 --- a/internal/core/src/segcore/SegmentLoadInfo.h +++ b/internal/core/src/segcore/SegmentLoadInfo.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -72,6 +73,88 @@ struct LoadDiff { HasManifestChange() const { return manifest_updated; } + + [[nodiscard]] std::string + ToString() const { + std::ostringstream oss; + oss << "LoadDiff{"; + + // indexes_to_load + oss << "indexes_to_load=["; + bool first = true; + for (const auto& [field_id, infos] : indexes_to_load) { + if (!first) + oss << ", "; + first = false; + oss << field_id.get() << ":" << infos.size() << " indexes"; + } + oss << "], "; + + // binlogs_to_load + oss << "binlogs_to_load=["; + first = true; + for (const auto& [field_ids, binlog] : binlogs_to_load) { + if (!first) + oss << ", "; + first = false; + oss << "["; + for (size_t i = 0; i < field_ids.size(); ++i) { + if (i > 0) + oss << ","; + oss << field_ids[i].get(); + } + oss << "]"; + } + oss << "], "; + + // column_groups_to_load + oss << "column_groups_to_load=["; + first = true; + for (const auto& [group_idx, field_ids] : column_groups_to_load) { + if (!first) + oss << ", "; + first = false; + oss << "group" << group_idx << ":["; + for (size_t i = 0; i < field_ids.size(); ++i) { + if (i > 0) + oss << ","; + oss << field_ids[i].get(); + } + oss << "]"; + } + oss << "], "; + + // indexes_to_drop + oss << "indexes_to_drop=["; + first = true; + for (const auto& field_id : indexes_to_drop) { + if (!first) + oss << ", "; + first = false; + oss << field_id.get(); + } + oss << "], "; + + // field_data_to_drop + oss << "field_data_to_drop=["; + first = true; + for (const auto& field_id : field_data_to_drop) { + if (!first) + oss << ", "; + first = false; + oss << field_id.get(); + } + oss << "], "; + + // manifest_updated and new_manifest_path + oss << "manifest_updated=" << (manifest_updated ? "true" : "false"); + if (manifest_updated) { + oss << ", new_manifest_path=" << new_manifest_path; + } + + oss << "}"; + return oss.str(); + } }; /** diff --git a/internal/core/src/segcore/SegmentLoadInfoTest.cpp b/internal/core/src/segcore/SegmentLoadInfoTest.cpp index cc45ff9db7..39017b8b60 100644 --- a/internal/core/src/segcore/SegmentLoadInfoTest.cpp +++ b/internal/core/src/segcore/SegmentLoadInfoTest.cpp @@ -362,6 +362,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesOnly) { SegmentLoadInfo info(test_proto); auto diff = info.GetLoadDiff(); + std::cout << diff.ToString() << "\n"; EXPECT_TRUE(diff.HasChanges()); // Both indexes should be in indexes_to_load @@ -394,6 +395,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsOnly) { SegmentLoadInfo info(test_proto); auto diff = info.GetLoadDiff(); + std::cout << diff.ToString() << "\n"; EXPECT_TRUE(diff.HasChanges()); // Binlogs should be in binlogs_to_load @@ -430,6 +432,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesAndBinlogs) { SegmentLoadInfo info(test_proto); auto diff = info.GetLoadDiff(); + std::cout << diff.ToString() << "\n"; EXPECT_TRUE(diff.HasChanges()); // Index should be in indexes_to_load @@ -466,6 +469,7 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffIgnoresIndexesWithoutFiles) { SegmentLoadInfo info(test_proto); auto diff = info.GetLoadDiff(); + std::cout << diff.ToString() << "\n"; EXPECT_TRUE(diff.HasChanges()); // Only index with files should be in indexes_to_load @@ -536,3 +540,241 @@ TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogGroups) { // Check second group EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1); } + +// ==================== Legacy Format (v1) Tests ==================== +// Test binlogs without child_fields (v1/legacy format where group_id == field_id) + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsLegacyFormat) { + // Create info with binlogs without child_fields (legacy/v1 format) + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add binlog WITHOUT child_fields - this is the legacy format + auto* binlog = test_proto.add_binlog_paths(); + binlog->set_fieldid(101); + // Note: no child_fields added - this triggers the legacy handling + auto* log = binlog->add_binlogs(); + log->set_log_path("/path/to/binlog"); + log->set_entries_num(500); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + std::cout << "Legacy format diff: " << diff.ToString() << "\n"; + + EXPECT_TRUE(diff.HasChanges()); + // In legacy format, field_id itself is used as the child_id + EXPECT_EQ(diff.binlogs_to_load.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), + 101); // field_id as child_id + + // No index changes + EXPECT_TRUE(diff.indexes_to_load.empty()); + EXPECT_TRUE(diff.indexes_to_drop.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogsLegacyFormat) { + // Test multiple binlogs in legacy format + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add first binlog without child_fields + auto* binlog1 = test_proto.add_binlog_paths(); + binlog1->set_fieldid(101); + auto* log1 = binlog1->add_binlogs(); + log1->set_log_path("/path/to/binlog1"); + log1->set_entries_num(500); + + // Add second binlog without child_fields + auto* binlog2 = test_proto.add_binlog_paths(); + binlog2->set_fieldid(102); + auto* log2 = binlog2->add_binlogs(); + log2->set_log_path("/path/to/binlog2"); + log2->set_entries_num(500); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + EXPECT_EQ(diff.binlogs_to_load.size(), 2); + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 101); + EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[1].first[0].get(), 102); +} + +TEST_F(SegmentLoadInfoTest, ComputeDiffWithBinlogsLegacyFormat) { + // Test ComputeDiff between two SegmentLoadInfos using legacy format + + // Current: has field 101 + proto::segcore::SegmentLoadInfo current_proto; + current_proto.set_segmentid(100); + current_proto.set_num_of_rows(1000); + auto* binlog1 = current_proto.add_binlog_paths(); + binlog1->set_fieldid(101); + auto* log1 = binlog1->add_binlogs(); + log1->set_log_path("/path/to/binlog1"); + log1->set_entries_num(500); + + // New: has field 101 and 102 + proto::segcore::SegmentLoadInfo new_proto; + new_proto.set_segmentid(100); + new_proto.set_num_of_rows(1000); + auto* new_binlog1 = new_proto.add_binlog_paths(); + new_binlog1->set_fieldid(101); + auto* new_log1 = new_binlog1->add_binlogs(); + new_log1->set_log_path("/path/to/binlog1"); + new_log1->set_entries_num(500); + + auto* new_binlog2 = new_proto.add_binlog_paths(); + new_binlog2->set_fieldid(102); + auto* new_log2 = new_binlog2->add_binlogs(); + new_log2->set_log_path("/path/to/binlog2"); + new_log2->set_entries_num(500); + + SegmentLoadInfo current_info(current_proto); + SegmentLoadInfo new_info(new_proto); + auto diff = current_info.ComputeDiff(new_info); + std::cout << "ComputeDiff legacy format: " << diff.ToString() << "\n"; + + EXPECT_TRUE(diff.HasChanges()); + // Field 102 should be added + EXPECT_EQ(diff.binlogs_to_load.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 102); + + // No fields should be dropped + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, ComputeDiffDropFieldLegacyFormat) { + // Test ComputeDiff when dropping a field in legacy format + + // Current: has fields 101 and 102 + proto::segcore::SegmentLoadInfo current_proto; + current_proto.set_segmentid(100); + current_proto.set_num_of_rows(1000); + + auto* binlog1 = current_proto.add_binlog_paths(); + binlog1->set_fieldid(101); + auto* log1 = binlog1->add_binlogs(); + log1->set_log_path("/path/to/binlog1"); + log1->set_entries_num(500); + + auto* binlog2 = current_proto.add_binlog_paths(); + binlog2->set_fieldid(102); + auto* log2 = binlog2->add_binlogs(); + log2->set_log_path("/path/to/binlog2"); + log2->set_entries_num(500); + + // New: only has field 101 (102 is dropped) + proto::segcore::SegmentLoadInfo new_proto; + new_proto.set_segmentid(100); + new_proto.set_num_of_rows(1000); + + auto* new_binlog1 = new_proto.add_binlog_paths(); + new_binlog1->set_fieldid(101); + auto* new_log1 = new_binlog1->add_binlogs(); + new_log1->set_log_path("/path/to/binlog1"); + new_log1->set_entries_num(500); + + SegmentLoadInfo current_info(current_proto); + SegmentLoadInfo new_info(new_proto); + auto diff = current_info.ComputeDiff(new_info); + + EXPECT_TRUE(diff.HasChanges()); + // No new binlogs to load + EXPECT_TRUE(diff.binlogs_to_load.empty()); + // Field 102 should be dropped + EXPECT_EQ(diff.field_data_to_drop.size(), 1); + EXPECT_TRUE(diff.field_data_to_drop.count(FieldId(102)) > 0); +} + +TEST_F(SegmentLoadInfoTest, ComputeDiffMixedFormats) { + // Test ComputeDiff with mixed formats: legacy and column group + + // Current: has field 101 (legacy) and column group 200 with child fields 201, 202 + proto::segcore::SegmentLoadInfo current_proto; + current_proto.set_segmentid(100); + current_proto.set_num_of_rows(1000); + + // Legacy format binlog + auto* binlog1 = current_proto.add_binlog_paths(); + binlog1->set_fieldid(101); + auto* log1 = binlog1->add_binlogs(); + log1->set_log_path("/path/to/binlog1"); + log1->set_entries_num(500); + + // Column group format binlog + auto* binlog2 = current_proto.add_binlog_paths(); + binlog2->set_fieldid(200); + binlog2->add_child_fields(201); + binlog2->add_child_fields(202); + auto* log2 = binlog2->add_binlogs(); + log2->set_log_path("/path/to/group_binlog"); + log2->set_entries_num(1000); + + // New: has all existing + new field 103 (legacy) + new field 203 in column group + proto::segcore::SegmentLoadInfo new_proto; + new_proto.set_segmentid(100); + new_proto.set_num_of_rows(1000); + + auto* new_binlog1 = new_proto.add_binlog_paths(); + new_binlog1->set_fieldid(101); + auto* new_log1 = new_binlog1->add_binlogs(); + new_log1->set_log_path("/path/to/binlog1"); + new_log1->set_entries_num(500); + + // New legacy field + auto* new_binlog3 = new_proto.add_binlog_paths(); + new_binlog3->set_fieldid(103); + auto* new_log3 = new_binlog3->add_binlogs(); + new_log3->set_log_path("/path/to/binlog3"); + new_log3->set_entries_num(500); + + // Updated column group with additional field + auto* new_binlog2 = new_proto.add_binlog_paths(); + new_binlog2->set_fieldid(200); + new_binlog2->add_child_fields(201); + new_binlog2->add_child_fields(202); + new_binlog2->add_child_fields(203); // New field added to group + auto* new_log2 = new_binlog2->add_binlogs(); + new_log2->set_log_path("/path/to/group_binlog_new"); + new_log2->set_entries_num(1500); + + SegmentLoadInfo current_info(current_proto); + SegmentLoadInfo new_info(new_proto); + auto diff = current_info.ComputeDiff(new_info); + std::cout << "Mixed format diff: " << diff.ToString() << "\n"; + + EXPECT_TRUE(diff.HasChanges()); + // Field 103 (legacy) and field 203 (in column group) should be loaded + EXPECT_EQ(diff.binlogs_to_load.size(), 2); + + // No fields should be dropped + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, ComputeDiffNoChangesLegacyFormat) { + // Test ComputeDiff when there are no changes (same binlogs) + proto::segcore::SegmentLoadInfo proto; + proto.set_segmentid(100); + proto.set_num_of_rows(1000); + + auto* binlog = proto.add_binlog_paths(); + binlog->set_fieldid(101); + auto* log = binlog->add_binlogs(); + log->set_log_path("/path/to/binlog"); + log->set_entries_num(500); + + SegmentLoadInfo current_info(proto); + SegmentLoadInfo new_info(proto); + auto diff = current_info.ComputeDiff(new_info); + + EXPECT_FALSE(diff.HasChanges()); + EXPECT_TRUE(diff.binlogs_to_load.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index 935c805e8a..82cf2da5a9 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -87,6 +87,7 @@ func TestPauseConsumption(t *testing.T) { pendingQueue: utility.NewPendingQueue(), cleanup: func() {}, ScannerHelper: helper.NewScannerHelper("test"), + metrics: metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(), } done := make(chan struct{}) diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index c3e7761091..c7f604d6de 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -28,8 +28,8 @@ pytest-parallel pytest-random-order # pymilvus -pymilvus==2.7.0rc93 -pymilvus[bulk_writer]==2.7.0rc93 +pymilvus==2.7.0rc95 +pymilvus[bulk_writer]==2.7.0rc95 # for protobuf protobuf>=5.29.5