diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index c8f76f6b0b..51db032634 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -37,6 +37,7 @@ #include "storage/FieldData.h" #include "storage/MemFileManagerImpl.h" #include "storage/ThreadPools.h" +#include "storage/Util.h" namespace milvus::index { @@ -104,32 +105,78 @@ VectorMemIndex::Load(const Config& config) { AssertInfo(index_files.has_value(), "index file paths is empty when load index"); + LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size(); + auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + std::map index_datas{}; - std::map channels; - for (const auto& file : index_files.value()) { - auto key = file.substr(file.find_last_of('/') + 1); - LOG_SEGCORE_INFO_ << "loading index file " << key; - if (channels.find(key) == channels.end()) { - channels.emplace(std::move(key), - std::make_shared( - parallel_degree * 2)); + // try to read slice meta first + std::string slice_meta_filepath; + for (auto& file : index_files.value()) { + auto file_name = file.substr(file.find_last_of('/') + 1); + if (file_name == INDEX_FILE_SLICE_META) { + slice_meta_filepath = file; + break; } } - auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); - auto future = pool.Submit( - [&] { file_manager_->LoadFileStream(index_files.value(), channels); }); + if (slice_meta_filepath + .empty()) { // no slice meta, we could simply load all these files + index_datas = file_manager_->LoadIndexToMemory(index_files.value()); + AssembleIndexDatas(index_datas); + } else { // load with the slice meta info, then we can load batch by batch + std::string index_file_prefix = slice_meta_filepath.substr( + 0, slice_meta_filepath.find_last_of('/') + 1); + std::vector batch{}; + batch.reserve(parallel_degree); - LOG_SEGCORE_INFO_ << "assemble index data..."; - std::unordered_map result; - AssembleIndexDatas(channels, result); - LOG_SEGCORE_INFO_ << "assemble index data done"; + auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath}); + auto raw_slice_meta = result[INDEX_FILE_SLICE_META]; + Config meta_data = Config::parse( + std::string(static_cast(raw_slice_meta->Data()), + raw_slice_meta->Size())); + + for (auto& item : meta_data[META]) { + std::string prefix = item[NAME]; + int slice_num = item[SLICE_NUM]; + auto total_len = static_cast(item[TOTAL_LEN]); + + auto new_field_data = + milvus::storage::CreateFieldData(DataType::INT8, 1, total_len); + auto HandleBatch = [&](int index) { + auto batch_data = file_manager_->LoadIndexToMemory(batch); + for (int j = index - batch.size() + 1; j <= index; j++) { + std::string file_name = GenSlicedFileName(prefix, j); + AssertInfo(batch_data.find(file_name) != batch_data.end(), + "lost index slice data"); + auto data = batch_data[file_name]; + new_field_data->FillFieldData(data->Data(), data->Size()); + } + batch.clear(); + }; + + for (auto i = 0; i < slice_num; ++i) { + std::string file_name = GenSlicedFileName(prefix, i); + batch.push_back(index_file_prefix + file_name); + if (batch.size() >= parallel_degree) { + HandleBatch(i); + } + } + if (batch.size() > 0) { + HandleBatch(slice_num - 1); + } + + AssertInfo( + new_field_data->IsFull(), + "index len is inconsistent after disassemble and assemble"); + index_datas[prefix] = new_field_data; + } + } LOG_SEGCORE_INFO_ << "construct binary set..."; BinarySet binary_set; - for (auto& [key, data] : result) { + for (auto& [key, data] : index_datas) { LOG_SEGCORE_INFO_ << "add index data to binary set: " << key; auto size = data->Size(); auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index ddb4dfe2e7..def1f351a1 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -119,41 +119,6 @@ MemFileManagerImpl::LoadIndexToMemory( return file_to_index_data; } -void -MemFileManagerImpl::LoadFileStream( - const std::vector& remote_files, - std::map& channels) { - auto parallel_degree = - static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - - std::vector batch_files; - auto LoadBatchIndexFiles = [&]() { - auto index_datas = GetObjectData(rcm_.get(), batch_files); - for (auto i = 0; i < index_datas.size(); i++) { - auto file_name = - batch_files[i].substr(batch_files[i].find_last_of('/') + 1); - auto& channel = channels[file_name]; - channel->push(index_datas[i]); - } - }; - - for (auto& file : remote_files) { - if (batch_files.size() >= parallel_degree) { - LoadBatchIndexFiles(); - batch_files.clear(); - } - batch_files.emplace_back(file); - } - - if (batch_files.size() > 0) { - LoadBatchIndexFiles(); - } - - for (auto& [_, channel] : channels) { - channel->close(); - } -} - std::vector MemFileManagerImpl::CacheRawDataToMemory( std::vector remote_files) { diff --git a/internal/core/src/storage/MemFileManagerImpl.h b/internal/core/src/storage/MemFileManagerImpl.h index 49705bb492..44f2054e59 100644 --- a/internal/core/src/storage/MemFileManagerImpl.h +++ b/internal/core/src/storage/MemFileManagerImpl.h @@ -55,11 +55,6 @@ class MemFileManagerImpl : public FileManagerImpl { std::map LoadIndexToMemory(const std::vector& remote_files); - void - LoadFileStream( - const std::vector& remote_files, - std::map& channels); - std::vector CacheRawDataToMemory(std::vector remote_files);