From 0a208d72243b02110ee71818c4ed30d91497a8d2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 14 Nov 2025 11:21:37 +0800 Subject: [PATCH] enhance: Move segment loading logic from Go layer to segcore for self-managed loading (#45488) Related to #45060 Refactor segment loading architecture to make segments autonomously manage their own loading process, moving the orchestration logic from Go (segment_loader.go) to C++ (segcore). **C++ Layer (segcore):** - Added `SetLoadInfo()` and `Load()` methods to `SegmentInterface` and implementations - Implemented `ChunkedSegmentSealedImpl::Load()` with parallel loading strategy: - Separates indexed fields from non-indexed fields - Loads indexes concurrently using thread pools - Loads field data for non-indexed fields in parallel - Implemented `SegmentGrowingImpl::Load()` to convert and load field data - Extracted `LoadIndexData()` as a reusable utility function in `Utils.cpp` - Added `SegmentLoad()` C binding in `segment_c.cpp` **Go Layer:** - Added `Load()` method to segment interfaces - Updated mock implementations and test interfaces - Integrated new C++ `SegmentLoad()` binding in Go segment wrapper --------- Signed-off-by: Congqi Xia --- internal/core/src/common/Types.h | 2 + .../src/segcore/ChunkedSegmentSealedImpl.cpp | 243 ++++++++++++++++++ .../src/segcore/ChunkedSegmentSealedImpl.h | 13 + .../core/src/segcore/SegmentGrowingImpl.cpp | 42 +++ .../core/src/segcore/SegmentGrowingImpl.h | 3 + internal/core/src/segcore/SegmentInterface.h | 3 + internal/core/src/segcore/Utils.cpp | 115 +++++++++ internal/core/src/segcore/Utils.h | 5 + internal/core/src/segcore/load_index_c.cpp | 110 +------- internal/core/src/segcore/segment_c.cpp | 17 ++ internal/core/src/segcore/segment_c.h | 36 ++- internal/querynodev2/segments/mock_segment.go | 46 ++++ internal/querynodev2/segments/segment.go | 4 + .../querynodev2/segments/segment_interface.go | 1 + internal/querynodev2/segments/segment_l0.go | 4 + .../querynodev2/segments/segment_loader.go | 86 +++---- internal/util/segcore/segment.go | 10 +- internal/util/segcore/segment_interface.go | 3 + 18 files changed, 567 insertions(+), 176 deletions(-) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 208d58a668..c92a52dcaf 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -205,6 +205,8 @@ ToProtoDataType(DataType data_type) { return proto::schema::DataType::Int8Vector; case DataType::VECTOR_ARRAY: return proto::schema::DataType::ArrayOfVector; + case DataType::GEOMETRY: + return proto::schema::DataType::Geometry; // Internal-only or unsupported mappings case DataType::ROW: diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 8b6ffad2fe..98472a3d81 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -243,6 +243,77 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { request.has_raw_data); } +LoadIndexInfo +ChunkedSegmentSealedImpl::ConvertFieldIndexInfoToLoadIndexInfo( + const milvus::proto::segcore::FieldIndexInfo* field_index_info) const { + LoadIndexInfo load_index_info; + + load_index_info.segment_id = id_; + // Extract field ID + auto field_id = FieldId(field_index_info->fieldid()); + load_index_info.field_id = field_id.get(); + + // Get field type from schema + const auto& field_meta = get_schema()[field_id]; + load_index_info.field_type = field_meta.get_data_type(); + load_index_info.element_type = field_meta.get_element_type(); + + // Set index metadata + load_index_info.index_id = field_index_info->indexid(); + load_index_info.index_build_id = field_index_info->buildid(); + load_index_info.index_version = field_index_info->index_version(); + load_index_info.index_store_version = + field_index_info->index_store_version(); + load_index_info.index_engine_version = + static_cast(field_index_info->current_index_version()); + load_index_info.index_size = field_index_info->index_size(); + load_index_info.num_rows = field_index_info->num_rows(); + load_index_info.schema = field_meta.ToProto(); + + // Copy index file paths, excluding indexParams file + for (const auto& file_path : field_index_info->index_file_paths()) { + size_t last_slash = file_path.find_last_of('/'); + std::string filename = (last_slash != std::string::npos) + ? file_path.substr(last_slash + 1) + : file_path; + + if (filename != "indexParams") { + load_index_info.index_files.push_back(file_path); + } + } + + bool mmap_enabled = false; + // Set index params + for (const auto& kv_pair : field_index_info->index_params()) { + if (kv_pair.key() == "mmap.enable") { + std::string lower; + std::transform(kv_pair.value().begin(), + kv_pair.value().end(), + std::back_inserter(lower), + ::tolower); + mmap_enabled = lower == "true"; + } + load_index_info.index_params[kv_pair.key()] = kv_pair.value(); + } + + size_t dim = + IsVectorDataType(field_meta.get_data_type()) && + !IsSparseFloatVectorDataType(field_meta.get_data_type()) + ? field_meta.get_dim() + : 1; + load_index_info.dim = dim; + auto remote_chunk_manager = + milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + load_index_info.mmap_dir_path = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager() + ->GetRootPath(); + load_index_info.enable_mmap = mmap_enabled; + + return load_index_info; +} + void ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { switch (load_info.storage_version) { @@ -2745,4 +2816,176 @@ ChunkedSegmentSealedImpl::LoadGeometryCache( } } +void +ChunkedSegmentSealedImpl::SetLoadInfo( + const proto::segcore::SegmentLoadInfo& load_info) { + std::unique_lock lck(mutex_); + segment_load_info_ = load_info; + LOG_INFO( + "SetLoadInfo for segment {}, num_rows: {}, index count: {}, " + "storage_version: {}", + id_, + load_info.num_of_rows(), + load_info.index_infos_size(), + load_info.storageversion()); +} + +void +ChunkedSegmentSealedImpl::Load(milvus::tracer::TraceContext& trace_ctx) { + // Get load info from segment_load_info_ + auto num_rows = segment_load_info_.num_of_rows(); + LOG_INFO("Loading segment {} with {} rows", id_, num_rows); + + // Step 1: Separate indexed and non-indexed fields + std::map + field_id_to_index_info; + std::set indexed_fields; + + for (int i = 0; i < segment_load_info_.index_infos_size(); i++) { + const auto& index_info = segment_load_info_.index_infos(i); + if (index_info.index_file_paths_size() == 0) { + continue; + } + auto field_id = FieldId(index_info.fieldid()); + field_id_to_index_info[field_id] = &index_info; + indexed_fields.insert(field_id); + } + + // Step 2: Load indexes in parallel using thread pool + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::LOW); + std::vector> load_index_futures; + + for (const auto& pair : field_id_to_index_info) { + auto field_id = pair.first; + auto index_info_ptr = pair.second; + auto future = pool.Submit( + [this, trace_ctx, field_id, index_info_ptr, num_rows]() mutable + -> void { + // Convert proto FieldIndexInfo to LoadIndexInfo + auto load_index_info = + ConvertFieldIndexInfoToLoadIndexInfo(index_info_ptr); + + LOG_INFO("Loading index for segment {} field {} with {} files", + id_, + field_id.get(), + load_index_info.index_files.size()); + + // Download & compose index + LoadIndexData(trace_ctx, &load_index_info); + + // Load index into segment + LoadIndex(load_index_info); + }); + + load_index_futures.push_back(std::move(future)); + } + + // Wait for all index loading to complete and collect exceptions + std::vector index_exceptions; + for (auto& future : load_index_futures) { + try { + future.get(); + } catch (...) { + index_exceptions.push_back(std::current_exception()); + } + } + + // If any exceptions occurred during index loading, handle them + if (!index_exceptions.empty()) { + LOG_ERROR("Failed to load {} out of {} indexes for segment {}", + index_exceptions.size(), + load_index_futures.size(), + id_); + + // Rethrow the first exception + std::rethrow_exception(index_exceptions[0]); + } + + LOG_INFO("Finished loading {} indexes for segment {}", + field_id_to_index_info.size(), + id_); + + // Step 3: Prepare field data info for non-indexed fields + std::map field_data_to_load; + for (int i = 0; i < segment_load_info_.binlog_paths_size(); i++) { + LoadFieldDataInfo load_field_data_info; + load_field_data_info.storage_version = + segment_load_info_.storageversion(); + + const auto& field_binlog = segment_load_info_.binlog_paths(i); + auto field_id = FieldId(field_binlog.fieldid()); + + // Skip if this field has an index with raw data + auto iter = index_has_raw_data_.find(field_id); + if (iter != index_has_raw_data_.end() && iter->second) { + LOG_INFO( + "Skip loading binlog for segment {} field {} because index " + "has raw data", + id_, + field_id.get()); + continue; + } + + // Build FieldBinlogInfo + FieldBinlogInfo field_binlog_info; + field_binlog_info.field_id = field_id.get(); + + // Calculate total row count and collect binlog paths + int64_t total_entries = 0; + for (const auto& binlog : field_binlog.binlogs()) { + field_binlog_info.insert_files.push_back(binlog.log_path()); + field_binlog_info.entries_nums.push_back(binlog.entries_num()); + field_binlog_info.memory_sizes.push_back(binlog.memory_size()); + total_entries += binlog.entries_num(); + } + field_binlog_info.row_count = total_entries; + + // Store in map + load_field_data_info.field_infos[field_id.get()] = field_binlog_info; + + field_data_to_load[field_id] = load_field_data_info; + } + + // Step 4: Load field data for non-indexed fields + if (!field_data_to_load.empty()) { + LOG_INFO("Loading field data for {} fields in segment {}", + field_data_to_load.size(), + id_); + std::vector> load_field_futures; + + for (const auto& [field_id, load_field_data_info] : + field_data_to_load) { + // Create a local copy to capture in lambda (C++17 compatible) + const auto field_data = load_field_data_info; + auto future = pool.Submit( + [this, field_data]() -> void { LoadFieldData(field_data); }); + + load_field_futures.push_back(std::move(future)); + } + + // Wait for all field data loading to complete and collect exceptions + std::vector field_exceptions; + for (auto& future : load_field_futures) { + try { + future.get(); + } catch (...) { + field_exceptions.push_back(std::current_exception()); + } + } + + // If any exceptions occurred during field data loading, handle them + if (!field_exceptions.empty()) { + LOG_ERROR("Failed to load {} out of {} field data for segment {}", + field_exceptions.size(), + load_field_futures.size(), + id_); + + // Rethrow the first exception + std::rethrow_exception(field_exceptions[0]); + } + } + + LOG_INFO("Successfully loaded segment {} with {} rows", id_, num_rows); +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 7323587764..49bf90e7af 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -186,6 +186,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { void FinishLoad() override; + void + SetLoadInfo( + const milvus::proto::segcore::SegmentLoadInfo& load_info) override; + + void + Load(milvus::tracer::TraceContext& trace_ctx) override; + public: size_t GetMemoryUsageInBytes() const override { @@ -505,6 +512,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { bool is_proxy_column, std::optional statistics = {}); + // Convert proto::segcore::FieldIndexInfo to LoadIndexInfo + LoadIndexInfo + ConvertFieldIndexInfoToLoadIndexInfo( + const milvus::proto::segcore::FieldIndexInfo* field_index_info) const; + std::shared_ptr get_column(FieldId field_id) const { std::shared_ptr res; @@ -558,6 +570,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { mutable DeletedRecord deleted_record_; LoadFieldDataInfo field_data_info_; + milvus::proto::segcore::SegmentLoadInfo segment_load_info_; SchemaPtr schema_; int64_t id_; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index a6980e3944..71bf3a501e 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -1326,6 +1326,48 @@ SegmentGrowingImpl::Reopen(SchemaPtr sch) { } } +void +SegmentGrowingImpl::Load(milvus::tracer::TraceContext& trace_ctx) { + // Convert load_info_ (SegmentLoadInfo) to LoadFieldDataInfo + LoadFieldDataInfo field_data_info; + + // Set storage version + field_data_info.storage_version = load_info_.storageversion(); + + // Set load priority + field_data_info.load_priority = load_info_.priority(); + + // Convert binlog_paths to field_infos + for (const auto& field_binlog : load_info_.binlog_paths()) { + FieldBinlogInfo binlog_info; + binlog_info.field_id = field_binlog.fieldid(); + + // Process each binlog + int64_t total_row_count = 0; + for (const auto& binlog : field_binlog.binlogs()) { + binlog_info.entries_nums.push_back(binlog.entries_num()); + binlog_info.insert_files.push_back(binlog.log_path()); + binlog_info.memory_sizes.push_back(binlog.memory_size()); + total_row_count += binlog.entries_num(); + } + binlog_info.row_count = total_row_count; + + // Set child field ids + for (const auto& child_field : field_binlog.child_fields()) { + binlog_info.child_field_ids.push_back(child_field); + } + + // Add to field_infos map + field_data_info.field_infos[binlog_info.field_id] = + std::move(binlog_info); + } + + // Call LoadFieldData with the converted info + if (!field_data_info.field_infos.empty()) { + LoadFieldData(field_data_info); + } +} + void SegmentGrowingImpl::FinishLoad() { for (const auto& [field_id, field_meta] : schema_->get_fields()) { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index a39baf950c..a639a427f8 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -106,6 +106,9 @@ class SegmentGrowingImpl : public SegmentGrowing { void FinishLoad() override; + void + Load(milvus::tracer::TraceContext& trace_ctx) override; + private: // Build geometry cache for inserted data void diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index dee343088b..4602587de3 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -200,6 +200,9 @@ class SegmentInterface { virtual void SetLoadInfo(const milvus::proto::segcore::SegmentLoadInfo& load_info) = 0; + + virtual void + Load(milvus::tracer::TraceContext& trace_ctx) = 0; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index f4095f15cc..bba1326d82 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -26,6 +26,9 @@ #include "common/Utils.h" #include "index/ScalarIndex.h" #include "log/Log.h" +#include "segcore/storagev1translator/SealedIndexTranslator.h" +#include "segcore/storagev1translator/V1SealedIndexTranslator.h" +#include "segcore/Types.h" #include "storage/DataCodec.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/ThreadPools.h" @@ -1200,4 +1203,116 @@ getCellDataType(bool is_vector, bool is_index) { } } +void +LoadIndexData(milvus::tracer::TraceContext& ctx, + milvus::segcore::LoadIndexInfo* load_index_info) { + auto& index_params = load_index_info->index_params; + auto field_type = load_index_info->field_type; + auto engine_version = load_index_info->index_engine_version; + + milvus::index::CreateIndexInfo index_info; + index_info.field_type = load_index_info->field_type; + index_info.index_engine_version = engine_version; + + auto config = milvus::index::ParseConfigFromIndexParams( + load_index_info->index_params); + auto load_priority_str = config[milvus::LOAD_PRIORITY].get(); + auto priority_for_load = milvus::PriorityForLoad(load_priority_str); + config[milvus::LOAD_PRIORITY] = priority_for_load; + + // Config should have value for milvus::index::SCALAR_INDEX_ENGINE_VERSION for production calling chain. + // Use value_or(1) for unit test without setting this value + index_info.scalar_index_engine_version = + milvus::index::GetValueFromConfig( + config, milvus::index::SCALAR_INDEX_ENGINE_VERSION) + .value_or(1); + + index_info.tantivy_index_version = + milvus::index::GetValueFromConfig( + config, milvus::index::TANTIVY_INDEX_VERSION) + .value_or(milvus::index::TANTIVY_INDEX_LATEST_VERSION); + + LOG_INFO( + "[collection={}][segment={}][field={}][enable_mmap={}][load_" + "priority={}] load index {}, " + "mmap_dir_path={}", + load_index_info->collection_id, + load_index_info->segment_id, + load_index_info->field_id, + load_index_info->enable_mmap, + load_priority_str, + load_index_info->index_id, + load_index_info->mmap_dir_path); + // get index type + AssertInfo(index_params.find("index_type") != index_params.end(), + "index type is empty"); + index_info.index_type = index_params.at("index_type"); + + // get metric type + if (milvus::IsVectorDataType(field_type)) { + AssertInfo(index_params.find("metric_type") != index_params.end(), + "metric type is empty for vector index"); + index_info.metric_type = index_params.at("metric_type"); + } + + if (index_info.index_type == milvus::index::NGRAM_INDEX_TYPE) { + AssertInfo( + index_params.find(milvus::index::MIN_GRAM) != index_params.end(), + "min_gram is empty for ngram index"); + AssertInfo( + index_params.find(milvus::index::MAX_GRAM) != index_params.end(), + "max_gram is empty for ngram index"); + + // get min_gram and max_gram and convert to uintptr_t + milvus::index::NgramParams ngram_params{}; + ngram_params.loading_index = true; + ngram_params.min_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MIN_GRAM) + .value()); + ngram_params.max_gram = + std::stoul(milvus::index::GetValueFromConfig( + config, milvus::index::MAX_GRAM) + .value()); + index_info.ngram_params = std::make_optional(ngram_params); + } + + // init file manager + milvus::storage::FieldDataMeta field_meta{load_index_info->collection_id, + load_index_info->partition_id, + load_index_info->segment_id, + load_index_info->field_id, + load_index_info->schema}; + milvus::storage::IndexMeta index_meta{load_index_info->segment_id, + load_index_info->field_id, + load_index_info->index_build_id, + load_index_info->index_version}; + config[milvus::index::INDEX_FILES] = load_index_info->index_files; + + if (load_index_info->field_type == milvus::DataType::JSON) { + index_info.json_cast_type = milvus::JsonCastType::FromString( + config.at(JSON_CAST_TYPE).get()); + index_info.json_path = config.at(JSON_PATH).get(); + } + auto remote_chunk_manager = + milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + AssertInfo(fs != nullptr, "arrow file system is nullptr"); + milvus::storage::FileManagerContext file_manager_context( + field_meta, index_meta, remote_chunk_manager, fs); + file_manager_context.set_for_loading_index(true); + + // use cache layer to load vector/scalar index + std::unique_ptr> + translator = std::make_unique< + milvus::segcore::storagev1translator::SealedIndexTranslator>( + index_info, load_index_info, ctx, file_manager_context, config); + + load_index_info->cache_index = + milvus::cachinglayer::Manager::GetInstance().CreateCacheSlot( + std::move(translator)); +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index d551e88d71..91170b8448 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -22,6 +22,7 @@ #include "index/Index.h" #include "cachinglayer/Utils.h" #include "segcore/ConcurrentVector.h" +#include "segcore/Types.h" namespace milvus::segcore { @@ -143,4 +144,8 @@ getCacheWarmupPolicy(bool is_vector, bool is_index, bool in_load_list = true); milvus::cachinglayer::CellDataType getCellDataType(bool is_vector, bool is_index); +void +LoadIndexData(milvus::tracer::TraceContext& ctx, + milvus::segcore::LoadIndexInfo* load_index_info); + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 54e7e3de72..116087711c 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -34,6 +34,7 @@ #include "cachinglayer/Manager.h" #include "segcore/storagev1translator/SealedIndexTranslator.h" #include "segcore/storagev1translator/V1SealedIndexTranslator.h" +#include "segcore/Utils.h" #include "monitor/scope_metric.h" bool @@ -239,121 +240,14 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { try { auto load_index_info = static_cast(c_load_index_info); - auto& index_params = load_index_info->index_params; - auto field_type = load_index_info->field_type; - auto engine_version = load_index_info->index_engine_version; - - milvus::index::CreateIndexInfo index_info; - index_info.field_type = load_index_info->field_type; - index_info.index_engine_version = engine_version; - - auto config = milvus::index::ParseConfigFromIndexParams( - load_index_info->index_params); - auto load_priority_str = - config[milvus::LOAD_PRIORITY].get(); - auto priority_for_load = milvus::PriorityForLoad(load_priority_str); - config[milvus::LOAD_PRIORITY] = priority_for_load; - - // Config should have value for milvus::index::SCALAR_INDEX_ENGINE_VERSION for production calling chain. - // Use value_or(1) for unit test without setting this value - index_info.scalar_index_engine_version = - milvus::index::GetValueFromConfig( - config, milvus::index::SCALAR_INDEX_ENGINE_VERSION) - .value_or(1); - - index_info.tantivy_index_version = - milvus::index::GetValueFromConfig( - config, milvus::index::TANTIVY_INDEX_VERSION) - .value_or(milvus::index::TANTIVY_INDEX_LATEST_VERSION); auto ctx = milvus::tracer::TraceContext{ c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; auto span = milvus::tracer::StartSpan("SegCoreLoadIndex", &ctx); milvus::tracer::SetRootSpan(span); - LOG_INFO( - "[collection={}][segment={}][field={}][enable_mmap={}][load_" - "priority={}] load index {}, " - "mmap_dir_path={}", - load_index_info->collection_id, - load_index_info->segment_id, - load_index_info->field_id, - load_index_info->enable_mmap, - load_priority_str, - load_index_info->index_id, - load_index_info->mmap_dir_path); + LoadIndexData(ctx, load_index_info); - // get index type - AssertInfo(index_params.find("index_type") != index_params.end(), - "index type is empty"); - index_info.index_type = index_params.at("index_type"); - - // get metric type - if (milvus::IsVectorDataType(field_type)) { - AssertInfo(index_params.find("metric_type") != index_params.end(), - "metric type is empty for vector index"); - index_info.metric_type = index_params.at("metric_type"); - } - - if (index_info.index_type == milvus::index::NGRAM_INDEX_TYPE) { - AssertInfo(index_params.find(milvus::index::MIN_GRAM) != - index_params.end(), - "min_gram is empty for ngram index"); - AssertInfo(index_params.find(milvus::index::MAX_GRAM) != - index_params.end(), - "max_gram is empty for ngram index"); - - // get min_gram and max_gram and convert to uintptr_t - milvus::index::NgramParams ngram_params{}; - ngram_params.loading_index = true; - ngram_params.min_gram = - std::stoul(milvus::index::GetValueFromConfig( - config, milvus::index::MIN_GRAM) - .value()); - ngram_params.max_gram = - std::stoul(milvus::index::GetValueFromConfig( - config, milvus::index::MAX_GRAM) - .value()); - index_info.ngram_params = std::make_optional(ngram_params); - } - - // init file manager - milvus::storage::FieldDataMeta field_meta{ - load_index_info->collection_id, - load_index_info->partition_id, - load_index_info->segment_id, - load_index_info->field_id, - load_index_info->schema}; - milvus::storage::IndexMeta index_meta{load_index_info->segment_id, - load_index_info->field_id, - load_index_info->index_build_id, - load_index_info->index_version}; - config[milvus::index::INDEX_FILES] = load_index_info->index_files; - - if (load_index_info->field_type == milvus::DataType::JSON) { - index_info.json_cast_type = milvus::JsonCastType::FromString( - config.at(JSON_CAST_TYPE).get()); - index_info.json_path = config.at(JSON_PATH).get(); - } - auto remote_chunk_manager = - milvus::storage::RemoteChunkManagerSingleton::GetInstance() - .GetRemoteChunkManager(); - auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() - .GetArrowFileSystem(); - AssertInfo(fs != nullptr, "arrow file system is nullptr"); - milvus::storage::FileManagerContext fileManagerContext( - field_meta, index_meta, remote_chunk_manager, fs); - fileManagerContext.set_for_loading_index(true); - - // use cache layer to load vector/scalar index - std::unique_ptr< - milvus::cachinglayer::Translator> - translator = std::make_unique< - milvus::segcore::storagev1translator::SealedIndexTranslator>( - index_info, load_index_info, ctx, fileManagerContext, config); - load_index_info->cache_index = - milvus::cachinglayer::Manager::GetInstance().CreateCacheSlot( - std::move(translator)); span->End(); milvus::tracer::CloseRootSpan(); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 414a8f04d6..1b6b93a964 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -139,6 +139,23 @@ NewSegmentWithLoadInfo(CCollection collection, } } +CStatus +SegmentLoad(CTraceContext c_trace, CSegmentInterface c_segment) { + SCOPE_CGO_CALL_METRIC(); + + try { + auto segment = + static_cast(c_segment); + // TODO unify trace context to op context after supported + auto trace_ctx = milvus::tracer::TraceContext{ + c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; + segment->Load(trace_ctx); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + void DeleteSegment(CSegmentInterface c_segment) { SCOPE_CGO_CALL_METRIC(); diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index ed324d6e02..12f484afb5 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -36,18 +36,20 @@ NewSegment(CCollection collection, CSegmentInterface* newSegment, bool is_sorted_by_pk); -// Create a new segment with pre-loaded segment information. -// This function creates a segment and initializes it with serialized load info, -// which can include precomputed metadata, statistics, or configuration data. -// -// @param collection: The collection that this segment belongs to -// @param seg_type: Type of the segment (growing, sealed, etc.) -// @param segment_id: Unique identifier for this segment -// @param newSegment: Output parameter for the created segment interface -// @param is_sorted_by_pk: Whether the segment data is sorted by primary key -// @param load_info_blob: Serialized load information blob -// @param load_info_length: Length of the load_info_blob in bytes -// @return CStatus indicating success or failure +/** + * @brief Create a new segment with pre-loaded segment information + * This function creates a segment and initializes it with serialized load info, + * which can include precomputed metadata, statistics, or configuration data + * + * @param collection: The collection that this segment belongs to + * @param seg_type: Type of the segment (growing, sealed, etc.) + * @param segment_id: Unique identifier for this segment + * @param newSegment: Output parameter for the created segment interface + * @param is_sorted_by_pk: Whether the segment data is sorted by primary key + * @param load_info_blob: Serialized load information blob + * @param load_info_length: Length of the load_info_blob in bytes + * @return CStatus indicating success or failure + */ CStatus NewSegmentWithLoadInfo(CCollection collection, SegmentType seg_type, @@ -56,6 +58,16 @@ NewSegmentWithLoadInfo(CCollection collection, bool is_sorted_by_pk, const uint8_t* load_info_blob, const int64_t load_info_length); +/** + * @brief Dispatch a segment manage load task. + * This function make segment itself load index & field data according to load info previously set. + * + * @param c_trace: tracing context param + * @param c_segment: segment handle indicate which segment to load + * @return CStatus indicating success or failure + */ +CStatus +SegmentLoad(CTraceContext c_trace, CSegmentInterface c_segment); void DeleteSegment(CSegmentInterface c_segment); diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index f8a0cf6769..4ac128847b 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -1151,6 +1151,52 @@ func (_c *MockSegment_Level_Call) RunAndReturn(run func() datapb.SegmentLevel) * return _c } +// Load provides a mock function with given fields: ctx +func (_m *MockSegment) Load(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Load") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSegment_Load_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Load' +type MockSegment_Load_Call struct { + *mock.Call +} + +// Load is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockSegment_Expecter) Load(ctx interface{}) *MockSegment_Load_Call { + return &MockSegment_Load_Call{Call: _e.mock.On("Load", ctx)} +} + +func (_c *MockSegment_Load_Call) Run(run func(ctx context.Context)) *MockSegment_Load_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockSegment_Load_Call) Return(_a0 error) *MockSegment_Load_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_Load_Call) RunAndReturn(run func(context.Context) error) *MockSegment_Load_Call { + _c.Call.Return(run) + return _c +} + // LoadDeltaData provides a mock function with given fields: ctx, deltaData func (_m *MockSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error { ret := _m.Called(ctx, deltaData) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 4ccfbc86e4..237650ee3c 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1368,6 +1368,10 @@ func (s *LocalSegment) FinishLoad() error { return nil } +func (s *LocalSegment) Load(ctx context.Context) error { + return s.csegment.Load(ctx) +} + type ReleaseScope int const ( diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 43473d0135..5604d51c47 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -85,6 +85,7 @@ type Segment interface { Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error LastDeltaTimestamp() uint64 + Load(ctx context.Context) error FinishLoad() error Release(ctx context.Context, opts ...releaseOption) diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index 9775e0a314..4827e947fe 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -182,6 +182,10 @@ func (s *L0Segment) FinishLoad() error { return nil } +func (s *L0Segment) Load(ctx context.Context) error { + return nil +} + func (s *L0Segment) Release(ctx context.Context, opts ...releaseOption) { s.dataGuard.Lock() defer s.dataGuard.Unlock() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 54e897fe1a..e1a46a71d4 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storagecommon" + "github.com/milvus-io/milvus/internal/util/indexparamcheck" "github.com/milvus-io/milvus/internal/util/vecindexmgr" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" @@ -56,6 +57,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/contextutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/hardware" + "github.com/milvus-io/milvus/pkg/v2/util/indexparams" "github.com/milvus-io/milvus/pkg/v2/util/logutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metric" @@ -314,6 +316,28 @@ func (loader *segmentLoader) Load(ctx context.Context, for _, info := range infos { loadInfo := info + for _, indexInfo := range loadInfo.IndexInfos { + indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) + + // some build params also exist in indexParams, which are useless during loading process + if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexParams["index_type"]) { + if err := indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows()); err != nil { + return nil, err + } + } + + // set whether enable offset cache for bitmap index + if indexParams["index_type"] == indexparamcheck.IndexBitmap { + indexparams.SetBitmapIndexLoadParams(paramtable.Get(), indexParams) + } + + if err := indexparams.AppendPrepareLoadParams(paramtable.Get(), indexParams); err != nil { + return nil, err + } + + indexInfo.IndexParams = funcutil.Map2KeyValuePair(indexParams) + } + segment, err := NewSegment( ctx, collection, @@ -893,7 +917,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu collection := segment.GetCollection() schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) - indexedFieldInfos, fieldBinlogs, textIndexes, unindexedTextFields, jsonKeyStats := separateLoadInfoV2(loadInfo, collection.Schema()) + indexedFieldInfos, _, textIndexes, unindexedTextFields, jsonKeyStats := separateLoadInfoV2(loadInfo, collection.Schema()) if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { return err } @@ -906,58 +930,10 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu zap.Int64s("unindexed text fields", lo.Keys(unindexedTextFields)), zap.Int64s("indexed json key fields", lo.Keys(jsonKeyStats)), ) - if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { - return err - } - loadFieldsIndexSpan := tr.RecordSpan() - metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(loadFieldsIndexSpan.Milliseconds())) - // 2. complement raw data for the scalar fields without raw data - for _, info := range indexedFieldInfos { - fieldID := info.IndexInfo.FieldID - field, err := schemaHelper.GetFieldFromID(fieldID) - if err != nil { - return err - } - if !segment.HasRawData(fieldID) || field.GetIsPrimaryKey() { - // Skip loading raw data for fields in column group when using storage v2 - if loadInfo.GetStorageVersion() == storage.StorageV2 && - !storagecommon.IsVectorDataType(field.GetDataType()) && - field.GetDataType() != schemapb.DataType_Text { - log.Info("skip loading raw data for field in short column group", - zap.Int64("fieldID", fieldID), - zap.String("index", info.IndexInfo.GetIndexName()), - ) - continue - } - - log.Info("field index doesn't include raw data, load binlog...", - zap.Int64("fieldID", fieldID), - zap.String("index", info.IndexInfo.GetIndexName()), - ) - // for scalar index's raw data, only load to mmap not memory - if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog); err != nil { - log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err)) - return err - } - } - - if !storagecommon.IsVectorDataType(field.GetDataType()) && - !segment.HasFieldData(fieldID) && - loadInfo.GetStorageVersion() != storage.StorageV2 { - // Lazy load raw data to avoid search failure after dropping index. - // storage v2 will load all scalar fields so we don't need to load raw data for them. - if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, "disable"); err != nil { - log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err)) - return err - } - } + if err = segment.Load(ctx); err != nil { + return errors.Wrap(err, "At Load") } - complementScalarDataSpan := tr.RecordSpan() - if err := loadSealedSegmentFields(ctx, collection, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil { - return err - } - loadRawDataSpan := tr.RecordSpan() if err = segment.FinishLoad(); err != nil { return errors.Wrap(err, "At FinishLoad") @@ -993,9 +969,9 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu } patchEntryNumberSpan := tr.RecordSpan() log.Info("Finish loading segment", - zap.Duration("loadFieldsIndexSpan", loadFieldsIndexSpan), - zap.Duration("complementScalarDataSpan", complementScalarDataSpan), - zap.Duration("loadRawDataSpan", loadRawDataSpan), + // zap.Duration("loadFieldsIndexSpan", loadFieldsIndexSpan), + // zap.Duration("complementScalarDataSpan", complementScalarDataSpan), + // zap.Duration("loadRawDataSpan", loadRawDataSpan), zap.Duration("patchEntryNumberSpan", patchEntryNumberSpan), zap.Duration("loadTextIndexesSpan", loadTextIndexesSpan), zap.Duration("loadJsonKeyIndexSpan", loadJSONKeyIndexesSpan), @@ -1040,7 +1016,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, return err } } else { - if err := segment.LoadMultiFieldData(ctx); err != nil { + if err := segment.Load(ctx); err != nil { return err } if err := segment.FinishLoad(); err != nil { diff --git a/internal/util/segcore/segment.go b/internal/util/segcore/segment.go index 119fee7959..beb691f1c9 100644 --- a/internal/util/segcore/segment.go +++ b/internal/util/segcore/segment.go @@ -66,7 +66,8 @@ func CreateCSegment(req *CreateCSegmentRequest) (CSegment, error) { var ptr C.CSegmentInterface var status C.CStatus if req.LoadInfo != nil { - loadInfoBlob, err := proto.Marshal(req.LoadInfo) + segLoadInfo := ConvertToSegcoreSegmentLoadInfo(req.LoadInfo) + loadInfoBlob, err := proto.Marshal(segLoadInfo) if err != nil { return nil, err } @@ -311,6 +312,13 @@ func (s *cSegmentImpl) FinishLoad() error { return nil } +func (s *cSegmentImpl) Load(ctx context.Context) error { + traceCtx := ParseCTraceContext(ctx) + defer runtime.KeepAlive(traceCtx) + status := C.SegmentLoad(traceCtx.ctx, s.ptr) + return ConsumeCStatusIntoError(&status) +} + func (s *cSegmentImpl) DropIndex(ctx context.Context, fieldID int64) error { status := C.DropSealedSegmentIndex(s.ptr, C.int64_t(fieldID)) if err := ConsumeCStatusIntoError(&status); err != nil { diff --git a/internal/util/segcore/segment_interface.go b/internal/util/segcore/segment_interface.go index 6dce7dcf0a..d9f6052d2d 100644 --- a/internal/util/segcore/segment_interface.go +++ b/internal/util/segcore/segment_interface.go @@ -80,6 +80,9 @@ type basicSegmentMethodSet interface { // FinishLoad wraps up the load process and let segcore do the leftover jobs. FinishLoad() error + // Load invokes segment managed loading. + Load(ctx context.Context) error + // Release releases the segment. Release() }