diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 820671f125..a023219bc8 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -2629,7 +2629,14 @@ ChunkedSegmentSealedImpl::Reopen( // compute load diff auto diff = current.ComputeDiff(new_seg_load_info); + ApplyLoadDiff(new_seg_load_info, diff); + LOG_INFO("Reopen segment {} done", id_); +} + +void +ChunkedSegmentSealedImpl::ApplyLoadDiff(SegmentLoadInfo& segment_load_info, + LoadDiff& diff) { milvus::tracer::TraceContext trace_ctx; if (!diff.indexes_to_load.empty()) { LoadBatchIndexes(trace_ctx, diff.indexes_to_load); @@ -2647,9 +2654,11 @@ ChunkedSegmentSealedImpl::Reopen( auto properties = milvus::storage::LoonFFIPropertiesSingleton::GetInstance() .GetProperties(); - LoadColumnGroups(new_seg_load_info.GetColumnGroups(), - properties, - diff.column_groups_to_load); + auto column_groups = segment_load_info.GetColumnGroups(); + auto arrow_schema = schema_->ConvertToArrowSchema(); + reader_ = milvus_storage::api::Reader::create( + column_groups, arrow_schema, nullptr, *properties); + LoadColumnGroups(column_groups, properties, diff.column_groups_to_load); } // load field binlog @@ -2663,8 +2672,6 @@ ChunkedSegmentSealedImpl::Reopen( DropFieldData(field_id); } } - - LOG_INFO("Reopen segment {} done", id_); } void @@ -2794,7 +2801,7 @@ ChunkedSegmentSealedImpl::LoadManifest(const std::string& manifest_path) { reader_ = milvus_storage::api::Reader::create( column_groups, arrow_schema, nullptr, *properties); - std::map> cg_field_ids_map; + std::vector>> cg_field_ids; for (int i = 0; i < column_groups->size(); ++i) { auto column_group = column_groups->get_column_group(i); std::vector milvus_field_ids; @@ -2802,22 +2809,22 @@ ChunkedSegmentSealedImpl::LoadManifest(const std::string& manifest_path) { auto field_id = std::stoll(column); milvus_field_ids.emplace_back(field_id); } - cg_field_ids_map[i] = milvus_field_ids; + cg_field_ids.emplace_back(i, std::move(milvus_field_ids)); } - LoadColumnGroups(column_groups, properties, cg_field_ids_map); + LoadColumnGroups(column_groups, properties, cg_field_ids); } void ChunkedSegmentSealedImpl::LoadColumnGroups( const std::shared_ptr& column_groups, const std::shared_ptr& properties, - std::map>& cg_field_ids_map) { + std::vector>>& cg_field_ids) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW); std::vector> load_group_futures; - for (const auto& kv : cg_field_ids_map) { - auto cg_index = kv.first; - const auto& field_ids = kv.second; + for (const auto& pair : cg_field_ids) { + auto cg_index = pair.first; + const auto& field_ids = pair.second; auto future = pool.Submit([this, column_groups, properties, cg_index, field_ids] { LoadColumnGroup(column_groups, properties, cg_index, field_ids); @@ -3172,62 +3179,8 @@ ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) { auto num_rows = segment_load_info_.GetNumOfRows(); LOG_INFO("Loading segment {} with {} rows", id_, num_rows); - // Step 1: Separate indexed and non-indexed fields - std::map> - field_id_to_index_info; - std::set indexed_fields; - - for (int i = 0; i < segment_load_info_.GetIndexInfoCount(); i++) { - const auto& index_info = segment_load_info_.GetIndexInfo(i); - if (index_info.index_file_paths_size() == 0) { - continue; - } - auto field_id = FieldId(index_info.fieldid()); - field_id_to_index_info[field_id].push_back(&index_info); - indexed_fields.insert(field_id); - } - - // Step 2: Load indexes in parallel using thread pool - LoadBatchIndexes(trace_ctx, field_id_to_index_info); - - LOG_INFO("Finished loading {} indexes for segment {}", - field_id_to_index_info.size(), - id_); - - // Step 3.a: Load with manifest - auto manifest_path = segment_load_info_.GetManifestPath(); - if (manifest_path != "") { - LoadManifest(manifest_path); - return; - } - - // Step 3.b: Load with field binlog - std::vector, proto::segcore::FieldBinlog>> - field_binlog_to_load; - for (int i = 0; i < segment_load_info_.GetBinlogPathCount(); i++) { - LoadFieldDataInfo load_field_data_info; - load_field_data_info.storage_version = - segment_load_info_.GetStorageVersion(); - - const auto& field_binlog = segment_load_info_.GetBinlogPath(i); - - std::vector field_ids; - // when child fields specified, field id is group id, child field ids are actual id values here - if (field_binlog.child_fields_size() > 0) { - field_ids.reserve(field_binlog.child_fields_size()); - for (auto field_id : field_binlog.child_fields()) { - field_ids.emplace_back(field_id); - } - } else { - field_ids.emplace_back(field_binlog.fieldid()); - } - - field_binlog_to_load.emplace_back(field_ids, field_binlog); - } - - if (!field_binlog_to_load.empty()) { - LoadBatchFieldData(trace_ctx, field_binlog_to_load); - } + auto diff = segment_load_info_.GetLoadDiff(); + ApplyLoadDiff(segment_load_info_, diff); LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows); } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 4b311045f0..bc663379ef 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -987,7 +987,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { LoadColumnGroups( const std::shared_ptr& column_groups, const std::shared_ptr& properties, - std::map>& cg_field_ids_map); + std::vector>>& cg_field_ids); /** * @brief Load a single column group at the specified index @@ -1006,6 +1006,19 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { int64_t index, const std::vector& milvus_field_ids); + /** + * @brief Apply load differences to update segment load information + * + * This method processes the differences between current and new load states, + * updating the segment's loaded fields and indexes accordingly. It handles + * incremental updates during segment reopen operations. + * + * @param segment_load_info The segment load information to be updated + * @param load_diff The differences to apply, containing fields and indexes to add/remove + */ + void + ApplyLoadDiff(SegmentLoadInfo& segment_load_info, LoadDiff& load_diff); + void load_field_data_common( FieldId field_id, diff --git a/internal/core/src/segcore/SegmentLoadInfo.cpp b/internal/core/src/segcore/SegmentLoadInfo.cpp index c491b9100e..86b12d875a 100644 --- a/internal/core/src/segcore/SegmentLoadInfo.cpp +++ b/internal/core/src/segcore/SegmentLoadInfo.cpp @@ -13,8 +13,10 @@ #include #include +#include #include "common/FieldMeta.h" +#include "milvus-storage/column_groups.h" #include "storage/LocalChunkManagerSingleton.h" #include "storage/loon_ffi/property_singleton.h" #include "storage/MmapManager.h" @@ -152,12 +154,18 @@ SegmentLoadInfo::ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info) { // Get current indexed field IDs std::set current_index_ids; for (auto const& index_info : GetIndexInfos()) { + if (index_info.index_file_paths_size() == 0) { + continue; + } current_index_ids.insert(index_info.indexid()); } std::set new_index_ids; // Find indexes to load: indexes in new_info but not in current for (const auto& new_index_info : new_info.GetIndexInfos()) { + if (new_index_info.index_file_paths_size() == 0) { + continue; + } new_index_ids.insert(new_index_info.indexid()); if (current_index_ids.find(new_index_info.indexid()) == current_index_ids.end()) { @@ -168,6 +176,9 @@ SegmentLoadInfo::ComputeDiffIndexes(LoadDiff& diff, SegmentLoadInfo& new_info) { // Find indexes to drop: fields that have indexes in current but not in new_info for (const auto& index_info : GetIndexInfos()) { + if (index_info.index_file_paths_size() == 0) { + continue; + } if (new_index_ids.find(index_info.indexid()) == new_index_ids.end()) { diff.indexes_to_drop.insert(FieldId(index_info.fieldid())); } @@ -234,6 +245,7 @@ SegmentLoadInfo::ComputeDiffColumnGroups(LoadDiff& diff, std::map new_field_ids; for (int i = 0; i < new_column_group->size(); i++) { auto cg = new_column_group->get_column_group(i); + std::vector fields; for (const auto& column : cg->columns) { auto field_id = std::stoll(column); new_field_ids.emplace(field_id, i); @@ -241,9 +253,12 @@ SegmentLoadInfo::ComputeDiffColumnGroups(LoadDiff& diff, auto iter = cur_field_ids.find(field_id); // If this field doesn't exist in current, mark the column group for loading if (iter == cur_field_ids.end() || iter->second != i) { - diff.column_groups_to_load[i].emplace_back(field_id); + fields.emplace_back(field_id); } } + if (!fields.empty()) { + diff.column_groups_to_load.emplace_back(i, fields); + } } // Find field data to drop: fields in current but not in new @@ -280,4 +295,31 @@ SegmentLoadInfo::ComputeDiff(SegmentLoadInfo& new_info) { return diff; } +LoadDiff +SegmentLoadInfo::GetLoadDiff() { + LoadDiff diff; + + SegmentLoadInfo empty_info; + + // Handle index changes + empty_info.ComputeDiffIndexes(diff, *this); + + // Handle field data changes + // Note: Updates can only happen within the same category: + // - binlog -> binlog + // - manifest -> manifest + // Cross-category changes are not supported. + if (HasManifestPath()) { + // set mock path for null check + empty_info.info_.set_manifest_path("mocked manifest path"); + empty_info.column_groups_ = + std::make_shared(); + empty_info.ComputeDiffColumnGroups(diff, *this); + } else { + empty_info.ComputeDiffBinlogs(diff, *this); + } + + return diff; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentLoadInfo.h b/internal/core/src/segcore/SegmentLoadInfo.h index 0d2957188f..cd68615674 100644 --- a/internal/core/src/segcore/SegmentLoadInfo.h +++ b/internal/core/src/segcore/SegmentLoadInfo.h @@ -44,8 +44,9 @@ struct LoadDiff { std::vector, proto::segcore::FieldBinlog>> binlogs_to_load; - // Field id to column group index to be loaded - std::map> column_groups_to_load; + // list of column group indices and related field ids to load + // same index could appear multiple times if same group using different setups + std::vector>> column_groups_to_load; // Indexes that need to be dropped (field_id set) std::set indexes_to_drop; @@ -577,6 +578,17 @@ class SegmentLoadInfo { [[nodiscard]] LoadDiff ComputeDiff(SegmentLoadInfo& new_info); + /** + * @brief Get the LoadDiff from the current SegmentLoadInfo + * + * This method produces a LoadDiff that describes what needs to be loaded when + * load with current load info. + * + * @return LoadDiff containing the differences + */ + [[nodiscard]] LoadDiff + GetLoadDiff(); + // ==================== Underlying Proto Access ==================== /** diff --git a/internal/core/src/segcore/SegmentLoadInfoTest.cpp b/internal/core/src/segcore/SegmentLoadInfoTest.cpp index 16296439a9..cc45ff9db7 100644 --- a/internal/core/src/segcore/SegmentLoadInfoTest.cpp +++ b/internal/core/src/segcore/SegmentLoadInfoTest.cpp @@ -326,3 +326,213 @@ TEST_F(SegmentLoadInfoTest, IndexWithoutFiles) { EXPECT_FALSE(info.HasIndexInfo(FieldId(101))); EXPECT_EQ(info.GetIndexInfoCount(), 1); // Proto still has it } + +// ==================== GetLoadDiff Tests ==================== + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithEmptyInfo) { + // Empty SegmentLoadInfo should return empty diff + SegmentLoadInfo empty_info; + auto diff = empty_info.GetLoadDiff(); + + EXPECT_FALSE(diff.HasChanges()); + EXPECT_TRUE(diff.indexes_to_load.empty()); + EXPECT_TRUE(diff.binlogs_to_load.empty()); + EXPECT_TRUE(diff.column_groups_to_load.empty()); + EXPECT_TRUE(diff.indexes_to_drop.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); + EXPECT_FALSE(diff.manifest_updated); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesOnly) { + // Create info with only indexes (no binlogs, no manifest) + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add two indexes + auto* index1 = test_proto.add_index_infos(); + index1->set_fieldid(101); + index1->set_indexid(1001); + index1->add_index_file_paths("/path/to/index1"); + + auto* index2 = test_proto.add_index_infos(); + index2->set_fieldid(102); + index2->set_indexid(1002); + index2->add_index_file_paths("/path/to/index2"); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Both indexes should be in indexes_to_load + EXPECT_EQ(diff.indexes_to_load.size(), 2); + EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0); + EXPECT_TRUE(diff.indexes_to_load.count(FieldId(102)) > 0); + EXPECT_EQ(diff.indexes_to_load[FieldId(101)].size(), 1); + EXPECT_EQ(diff.indexes_to_load[FieldId(102)].size(), 1); + + // No drops or other changes + EXPECT_TRUE(diff.indexes_to_drop.empty()); + EXPECT_TRUE(diff.binlogs_to_load.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithBinlogsOnly) { + // Create info with only binlogs (no indexes, no manifest) + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add binlog with child fields + auto* binlog = test_proto.add_binlog_paths(); + binlog->set_fieldid(200); + binlog->add_child_fields(201); + binlog->add_child_fields(202); + auto* log = binlog->add_binlogs(); + log->set_log_path("/path/to/binlog"); + log->set_entries_num(500); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Binlogs should be in binlogs_to_load + EXPECT_EQ(diff.binlogs_to_load.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 2); // 2 child fields + EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 201); + EXPECT_EQ(diff.binlogs_to_load[0].first[1].get(), 202); + + // No index changes + EXPECT_TRUE(diff.indexes_to_load.empty()); + EXPECT_TRUE(diff.indexes_to_drop.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithIndexesAndBinlogs) { + // Create info with both indexes and binlogs + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add index + auto* index = test_proto.add_index_infos(); + index->set_fieldid(101); + index->set_indexid(1001); + index->add_index_file_paths("/path/to/index"); + + // Add binlog with child fields + auto* binlog = test_proto.add_binlog_paths(); + binlog->set_fieldid(200); + binlog->add_child_fields(201); + auto* log = binlog->add_binlogs(); + log->set_log_path("/path/to/binlog"); + log->set_entries_num(500); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Index should be in indexes_to_load + EXPECT_EQ(diff.indexes_to_load.size(), 1); + EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0); + + // Binlog should be in binlogs_to_load + EXPECT_EQ(diff.binlogs_to_load.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 1); + EXPECT_EQ(diff.binlogs_to_load[0].first[0].get(), 201); + + // No drops + EXPECT_TRUE(diff.indexes_to_drop.empty()); + EXPECT_TRUE(diff.field_data_to_drop.empty()); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffIgnoresIndexesWithoutFiles) { + // Indexes without files should be ignored in GetLoadDiff + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add index with files + auto* index1 = test_proto.add_index_infos(); + index1->set_fieldid(101); + index1->set_indexid(1001); + index1->add_index_file_paths("/path/to/index"); + + // Add index without files - should be ignored + auto* index2 = test_proto.add_index_infos(); + index2->set_fieldid(102); + index2->set_indexid(1002); + // No index_file_paths added + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Only index with files should be in indexes_to_load + EXPECT_EQ(diff.indexes_to_load.size(), 1); + EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0); + EXPECT_FALSE(diff.indexes_to_load.count(FieldId(102)) > 0); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleIndexesPerField) { + // A field can have multiple indexes (e.g., JSON field with multiple paths) + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add two indexes for the same field + auto* index1 = test_proto.add_index_infos(); + index1->set_fieldid(101); + index1->set_indexid(1001); + index1->add_index_file_paths("/path/to/index1"); + + auto* index2 = test_proto.add_index_infos(); + index2->set_fieldid(101); + index2->set_indexid(1002); + index2->add_index_file_paths("/path/to/index2"); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Both indexes should be in indexes_to_load for the same field + EXPECT_EQ(diff.indexes_to_load.size(), 1); + EXPECT_TRUE(diff.indexes_to_load.count(FieldId(101)) > 0); + EXPECT_EQ(diff.indexes_to_load[FieldId(101)].size(), 2); +} + +TEST_F(SegmentLoadInfoTest, GetLoadDiffWithMultipleBinlogGroups) { + // Test with multiple binlog groups + proto::segcore::SegmentLoadInfo test_proto; + test_proto.set_segmentid(100); + test_proto.set_num_of_rows(1000); + + // Add first binlog group + auto* binlog1 = test_proto.add_binlog_paths(); + binlog1->set_fieldid(200); + binlog1->add_child_fields(201); + binlog1->add_child_fields(202); + auto* log1 = binlog1->add_binlogs(); + log1->set_log_path("/path/to/binlog1"); + log1->set_entries_num(500); + + // Add second binlog group + auto* binlog2 = test_proto.add_binlog_paths(); + binlog2->set_fieldid(300); + binlog2->add_child_fields(301); + auto* log2 = binlog2->add_binlogs(); + log2->set_log_path("/path/to/binlog2"); + log2->set_entries_num(500); + + SegmentLoadInfo info(test_proto); + auto diff = info.GetLoadDiff(); + + EXPECT_TRUE(diff.HasChanges()); + // Both binlog groups should be in binlogs_to_load + EXPECT_EQ(diff.binlogs_to_load.size(), 2); + + // Check first group + EXPECT_EQ(diff.binlogs_to_load[0].first.size(), 2); + // Check second group + EXPECT_EQ(diff.binlogs_to_load[1].first.size(), 1); +}