// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include #include #include #include #include #include #include #include #include #include #include #include "common/Common.h" #include "common/Consts.h" #include "common/EasyAssert.h" #include "common/FieldData.h" #include "common/FieldDataInterface.h" #include "common/File.h" #include "common/Slice.h" #include "common/Types.h" #include "index/Utils.h" #include "log/Log.h" #include "storage/DiskFileManagerImpl.h" #include "storage/FileManager.h" #include "storage/IndexData.h" #include "storage/LocalChunkManagerSingleton.h" #include "storage/ThreadPools.h" #include "storage/Types.h" #include "storage/Util.h" namespace milvus::storage { DiskFileManagerImpl::DiskFileManagerImpl( const FileManagerContext& fileManagerContext) : FileManagerImpl(fileManagerContext.fieldDataMeta, fileManagerContext.indexMeta) { rcm_ = fileManagerContext.chunkManagerPtr; } DiskFileManagerImpl::~DiskFileManagerImpl() { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); local_chunk_manager->RemoveDir(GetIndexPathPrefixWithBuildID( local_chunk_manager, index_meta_.build_id)); local_chunk_manager->RemoveDir(GetTextIndexPathPrefixWithBuildID( local_chunk_manager, index_meta_.build_id)); local_chunk_manager->RemoveDir(GetJsonKeyIndexPathPrefixWithBuildID( local_chunk_manager, index_meta_.build_id)); } bool DiskFileManagerImpl::LoadFile(const std::string& file) noexcept { return true; } std::string DiskFileManagerImpl::GetRemoteIndexPath(const std::string& file_name, int64_t slice_num) const { std::string remote_prefix; remote_prefix = GetRemoteIndexObjectPrefix(); return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); } std::string DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const { auto remote_prefix = GetRemoteTextLogPrefix(); return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); } std::string DiskFileManagerImpl::GetRemoteJsonKeyIndexPath(const std::string& file_name, int64_t slice_num) { auto remote_prefix = GetRemoteJsonKeyLogPrefix(); return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); } bool DiskFileManagerImpl::AddFileInternal( const std::string& file, const std::function& get_remote_path) noexcept { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); FILEMANAGER_TRY if (!local_chunk_manager->Exist(file)) { LOG_ERROR("local file {} not exists", file); return false; } // record local file path local_paths_.emplace_back(file); auto fileName = GetFileName(file); auto fileSize = local_chunk_manager->Size(file); added_total_file_size_ += fileSize; std::vector batch_remote_files; std::vector remote_file_sizes; std::vector local_file_offsets; int slice_num = 0; auto parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); for (int64_t offset = 0; offset < fileSize; slice_num++) { if (batch_remote_files.size() >= parallel_degree) { AddBatchIndexFiles(file, local_file_offsets, batch_remote_files, remote_file_sizes); batch_remote_files.clear(); remote_file_sizes.clear(); local_file_offsets.clear(); } auto batch_size = std::min(FILE_SLICE_SIZE, int64_t(fileSize) - offset); batch_remote_files.emplace_back(get_remote_path(fileName, slice_num)); remote_file_sizes.emplace_back(batch_size); local_file_offsets.emplace_back(offset); offset += batch_size; } if (batch_remote_files.size() > 0) { AddBatchIndexFiles( file, local_file_offsets, batch_remote_files, remote_file_sizes); } FILEMANAGER_CATCH FILEMANAGER_END return true; } // namespace knowhere bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { return AddFileInternal(file, [this](const std::string& file_name, int slice_num) { return GetRemoteIndexPath(file_name, slice_num); }); } bool DiskFileManagerImpl::AddJsonKeyIndexLog(const std::string& file) noexcept { return AddFileInternal( file, [this](const std::string& file_name, int slice_num) { return GetRemoteJsonKeyIndexPath(file_name, slice_num); }); } bool DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { return AddFileInternal( file, [this](const std::string& file_name, int slice_num) { return GetRemoteTextLogPath(file_name, slice_num); }); } void DiskFileManagerImpl::AddBatchIndexFiles( const std::string& local_file_name, const std::vector& local_file_offsets, const std::vector& remote_files, const std::vector& remote_file_sizes) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; futures.reserve(remote_file_sizes.size()); AssertInfo(local_file_offsets.size() == remote_files.size(), "inconsistent size of offset slices with file slices"); AssertInfo(remote_files.size() == remote_file_sizes.size(), "inconsistent size of file slices with size slices"); for (int64_t i = 0; i < remote_files.size(); ++i) { futures.push_back(pool.Submit( [&](const std::string& file, const int64_t offset, const int64_t data_size) -> std::shared_ptr { auto buf = std::shared_ptr(new uint8_t[data_size]); local_chunk_manager->Read(file, offset, buf.get(), data_size); return buf; }, local_file_name, local_file_offsets[i], remote_file_sizes[i])); } // hold index data util upload index file done std::vector> index_datas; std::vector data_slices; for (auto& future : futures) { auto res = future.get(); index_datas.emplace_back(res); data_slices.emplace_back(res.get()); } std::map res; res = PutIndexData(rcm_.get(), data_slices, remote_file_sizes, remote_files, field_meta_, index_meta_); for (auto& re : res) { remote_paths_to_size_[re.first] = re.second; } } void DiskFileManagerImpl::CacheIndexToDiskInternal( const std::vector& remote_files, const std::function& get_local_index_prefix, milvus::proto::common::LoadPriority priority) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); std::map> index_slices; for (auto& file_path : remote_files) { auto pos = file_path.find_last_of('_'); AssertInfo(pos > 0, "invalided index file path:{}", file_path); try { auto idx = std::stoi(file_path.substr(pos + 1)); index_slices[file_path.substr(0, pos)].emplace_back(idx); } catch (const std::logic_error& e) { auto err_message = fmt::format( "invalided index file path:{}, error:{}", file_path, e.what()); LOG_ERROR(err_message); throw std::logic_error(err_message); } } for (auto& slices : index_slices) { std::sort(slices.second.begin(), slices.second.end()); } for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = get_local_index_prefix() + prefix.substr(prefix.find_last_of('/') + 1); local_chunk_manager->CreateFile(local_index_file_name); auto file = File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC); // Get the remote files std::vector batch_remote_files; batch_remote_files.reserve(slices.second.size()); uint64_t max_parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); auto appendIndexFiles = [&]() { auto index_chunks_futures = GetObjectData(rcm_.get(), batch_remote_files, milvus::PriorityForLoad(priority)); for (auto& chunk_future : index_chunks_futures) { auto chunk_codec = chunk_future.get(); file.Write(chunk_codec->PayloadData(), chunk_codec->PayloadSize()); } batch_remote_files.clear(); }; for (int& iter : slices.second) { auto origin_file = prefix + "_" + std::to_string(iter); batch_remote_files.push_back(origin_file); if (batch_remote_files.size() == max_parallel_degree) { appendIndexFiles(); } } if (batch_remote_files.size() > 0) { appendIndexFiles(); } local_paths_.emplace_back(local_index_file_name); } } void DiskFileManagerImpl::CacheIndexToDisk( const std::vector& remote_files, milvus::proto::common::LoadPriority priority) { return CacheIndexToDiskInternal( remote_files, [this]() { return GetLocalIndexObjectPrefix(); }, priority); } void DiskFileManagerImpl::CacheTextLogToDisk( const std::vector& remote_files, milvus::proto::common::LoadPriority priority) { return CacheIndexToDiskInternal( remote_files, [this]() { return GetLocalTextIndexPrefix(); }, priority); } void DiskFileManagerImpl::CacheJsonKeyIndexToDisk( const std::vector& remote_files, milvus::proto::common::LoadPriority priority) { return CacheIndexToDiskInternal( remote_files, [this]() { return GetLocalJsonKeyIndexPrefix(); }, priority); } template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config) { auto storage_version = index::GetValueFromConfig(config, STORAGE_VERSION_KEY) .value_or(0); if (storage_version == STORAGE_V2) { return cache_raw_data_to_disk_storage_v2(config); } return cache_raw_data_to_disk_internal(config); } template std::string DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) { auto insert_files = index::GetValueFromConfig>( config, INSERT_FILES_KEY); AssertInfo(insert_files.has_value(), "insert file paths is empty when build index"); auto remote_files = insert_files.value(); SortByPath(remote_files); auto segment_id = GetFieldDataMeta().segment_id; auto field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); std::string local_data_path; bool file_created = false; // get batch raw data from s3 and write batch data to disk file // TODO: load and write of different batches at the same time std::vector batch_files; // file format // num_rows(uint32) | dim(uint32) | index_data ([]uint8_t) uint32_t num_rows = 0; uint32_t dim = 0; int64_t write_offset = sizeof(num_rows) + sizeof(dim); auto FetchRawData = [&]() { auto field_datas = GetObjectData(rcm_.get(), batch_files); int batch_size = batch_files.size(); for (int i = 0; i < batch_size; i++) { auto field_data = field_datas[i].get()->GetFieldData(); num_rows += uint32_t(field_data->get_num_rows()); cache_raw_data_to_disk_common(field_data, local_chunk_manager, local_data_path, file_created, dim, write_offset); } }; auto parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); for (auto& file : remote_files) { if (batch_files.size() >= parallel_degree) { FetchRawData(); batch_files.clear(); } batch_files.emplace_back(file); } if (batch_files.size() > 0) { FetchRawData(); } // write num_rows and dim value to file header write_offset = 0; local_chunk_manager->Write( local_data_path, write_offset, &num_rows, sizeof(num_rows)); write_offset += sizeof(num_rows); local_chunk_manager->Write( local_data_path, write_offset, &dim, sizeof(dim)); return local_data_path; } template void DiskFileManagerImpl::cache_raw_data_to_disk_common( const FieldDataPtr& field_data, const std::shared_ptr& local_chunk_manager, std::string& local_data_path, bool& file_created, uint32_t& dim, int64_t& write_offset) { auto data_type = field_data->get_data_type(); if (!file_created) { auto init_file_info = [&](milvus::DataType dt) { local_data_path = storage::GenFieldRawDataPathPrefix( local_chunk_manager, GetFieldDataMeta().segment_id, GetFieldDataMeta().field_id) + "raw_data"; if (dt == milvus::DataType::VECTOR_SPARSE_FLOAT) { local_data_path += ".sparse_u32_f32"; } local_chunk_manager->CreateFile(local_data_path); }; init_file_info(data_type); file_created = true; } if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { dim = (uint32_t)(std::dynamic_pointer_cast>( field_data) ->Dim()); auto sparse_rows = static_cast*>( field_data->Data()); for (size_t i = 0; i < field_data->Length(); ++i) { auto row = sparse_rows[i]; auto row_byte_size = row.data_byte_size(); uint32_t nnz = row.size(); local_chunk_manager->Write(local_data_path, write_offset, const_cast(&nnz), sizeof(nnz)); write_offset += sizeof(nnz); local_chunk_manager->Write( local_data_path, write_offset, row.data(), row_byte_size); write_offset += row_byte_size; } } else { dim = field_data->get_dim(); auto data_size = field_data->get_num_rows() * milvus::GetVecRowSize(dim); local_chunk_manager->Write(local_data_path, write_offset, const_cast(field_data->Data()), data_size); write_offset += data_size; } } template std::string DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) { auto data_type = index::GetValueFromConfig(config, DATA_TYPE_KEY); AssertInfo(data_type.has_value(), "data type is empty when build index"); auto dim = index::GetValueFromConfig(config, DIM_KEY).value_or(0); auto segment_insert_files = index::GetValueFromConfig>>( config, SEGMENT_INSERT_FILES_KEY); AssertInfo(segment_insert_files.has_value(), "segment insert files is empty when build index"); auto all_remote_files = segment_insert_files.value(); for (auto& remote_files : all_remote_files) { SortByPath(remote_files); } auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); std::string local_data_path; bool file_created = false; // file format // num_rows(uint32) | dim(uint32) | index_data ([]uint8_t) uint32_t num_rows = 0; uint32_t var_dim = 0; int64_t write_offset = sizeof(num_rows) + sizeof(var_dim); auto field_datas = GetFieldDatasFromStorageV2( all_remote_files, GetFieldDataMeta().field_id, data_type.value(), dim); for (auto& field_data : field_datas) { num_rows += uint32_t(field_data->get_num_rows()); cache_raw_data_to_disk_common(field_data, local_chunk_manager, local_data_path, file_created, var_dim, write_offset); } // write num_rows and dim value to file header write_offset = 0; local_chunk_manager->Write( local_data_path, write_offset, &num_rows, sizeof(num_rows)); write_offset += sizeof(num_rows); local_chunk_manager->Write( local_data_path, write_offset, &var_dim, sizeof(var_dim)); return local_data_path; } template bool WriteOptFieldIvfDataImpl( const int64_t field_id, const std::shared_ptr& local_chunk_manager, const std::string& local_data_path, const std::vector& field_datas, uint64_t& write_offset) { using FieldDataT = DataTypeNativeOrVoid; using OffsetT = uint32_t; std::unordered_map> mp; OffsetT offset = 0; for (const auto& field_data : field_datas) { for (int64_t i = 0; i < field_data->get_num_rows(); ++i) { auto val = *reinterpret_cast(field_data->RawValue(i)); mp[val].push_back(offset++); } } // Do not write to disk if there is only one value if (mp.size() <= 1) { return false; } local_chunk_manager->Write(local_data_path, write_offset, const_cast(&field_id), sizeof(field_id)); write_offset += sizeof(field_id); const uint32_t num_of_unique_field_data = mp.size(); local_chunk_manager->Write(local_data_path, write_offset, const_cast(&num_of_unique_field_data), sizeof(num_of_unique_field_data)); write_offset += sizeof(num_of_unique_field_data); for (const auto& [val, offsets] : mp) { const uint32_t offsets_cnt = offsets.size(); local_chunk_manager->Write(local_data_path, write_offset, const_cast(&offsets_cnt), sizeof(offsets_cnt)); write_offset += sizeof(offsets_cnt); const size_t data_size = offsets_cnt * sizeof(OffsetT); local_chunk_manager->Write(local_data_path, write_offset, const_cast(offsets.data()), data_size); write_offset += data_size; } return true; } #define GENERATE_OPT_FIELD_IVF_IMPL(DT) \ WriteOptFieldIvfDataImpl
(field_id, \ local_chunk_manager, \ local_data_path, \ field_datas, \ write_offset) bool WriteOptFieldIvfData( const DataType& dt, const int64_t field_id, const std::shared_ptr& local_chunk_manager, const std::string& local_data_path, const std::vector& field_datas, uint64_t& write_offset) { switch (dt) { case DataType::BOOL: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL); case DataType::INT8: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8); case DataType::INT16: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16); case DataType::INT32: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32); case DataType::INT64: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64); case DataType::FLOAT: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT); case DataType::DOUBLE: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE); case DataType::STRING: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING); case DataType::VARCHAR: return GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR); default: LOG_WARN("Unsupported data type in optional scalar field: ", dt); return false; } return true; } #undef GENERATE_OPT_FIELD_IVF_IMPL void WriteOptFieldsIvfMeta( const std::shared_ptr& local_chunk_manager, const std::string& local_data_path, const uint32_t num_of_fields, uint64_t& write_offset) { const uint8_t kVersion = 0; local_chunk_manager->Write(local_data_path, write_offset, const_cast(&kVersion), sizeof(kVersion)); write_offset += sizeof(kVersion); local_chunk_manager->Write(local_data_path, write_offset, const_cast(&num_of_fields), sizeof(num_of_fields)); write_offset += sizeof(num_of_fields); } std::string DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { const uint32_t num_of_fields = fields_map.size(); if (0 == num_of_fields) { return ""; } else if (num_of_fields > 1) { PanicInfo( ErrorCode::NotImplemented, "vector index build with multiple fields is not supported yet"); } auto segment_id = GetFieldDataMeta().segment_id; auto vec_field_id = GetFieldDataMeta().field_id; auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); auto local_data_path = storage::GenFieldRawDataPathPrefix( local_chunk_manager, segment_id, vec_field_id) + std::string(VEC_OPT_FIELDS); local_chunk_manager->CreateFile(local_data_path); uint64_t write_offset = 0; WriteOptFieldsIvfMeta( local_chunk_manager, local_data_path, num_of_fields, write_offset); std::unordered_set actual_field_ids; for (auto& [field_id, tup] : fields_map) { const auto& field_type = std::get<1>(tup); auto& field_paths = std::get<2>(tup); if (0 == field_paths.size()) { LOG_WARN("optional field {} has no data", field_id); return ""; } SortByPath(field_paths); std::vector field_datas = FetchFieldData(rcm_.get(), field_paths); if (WriteOptFieldIvfData(field_type, field_id, local_chunk_manager, local_data_path, field_datas, write_offset)) { actual_field_ids.insert(field_id); } } if (actual_field_ids.size() != num_of_fields) { write_offset = 0; WriteOptFieldsIvfMeta(local_chunk_manager, local_data_path, actual_field_ids.size(), write_offset); if (actual_field_ids.empty()) { return ""; } } return local_data_path; } std::string DiskFileManagerImpl::GetFileName(const std::string& localfile) { boost::filesystem::path localPath(localfile); return localPath.filename().string(); } std::string DiskFileManagerImpl::GetIndexIdentifier() { return GenIndexPathIdentifier(index_meta_.build_id, index_meta_.index_version); } std::string DiskFileManagerImpl::GetLocalIndexObjectPrefix() { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); return GenIndexPathPrefix( local_chunk_manager, index_meta_.build_id, index_meta_.index_version); } std::string DiskFileManagerImpl::GetTextIndexIdentifier() { return std::to_string(index_meta_.build_id) + "/" + std::to_string(index_meta_.index_version) + "/" + std::to_string(field_meta_.segment_id) + "/" + std::to_string(field_meta_.field_id); } std::string DiskFileManagerImpl::GetLocalTextIndexPrefix() { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); return GenTextIndexPathPrefix(local_chunk_manager, index_meta_.build_id, index_meta_.index_version, field_meta_.segment_id, field_meta_.field_id); } std::string DiskFileManagerImpl::GetJsonKeyIndexIdentifier() { return GenJsonKeyIndexPathIdentifier(index_meta_.build_id, index_meta_.index_version, field_meta_.collection_id, field_meta_.partition_id, field_meta_.segment_id, field_meta_.field_id); } std::string DiskFileManagerImpl::GetLocalJsonKeyIndexPrefix() { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); return GenJsonKeyIndexPathPrefix(local_chunk_manager, index_meta_.build_id, index_meta_.index_version, field_meta_.collection_id, field_meta_.partition_id, field_meta_.segment_id, field_meta_.field_id); } std::string DiskFileManagerImpl::GetRemoteJsonKeyLogPrefix() { return GenJsonKeyIndexPathPrefix(rcm_, index_meta_.build_id, index_meta_.index_version, field_meta_.collection_id, field_meta_.partition_id, field_meta_.segment_id, field_meta_.field_id); } std::string DiskFileManagerImpl::GetLocalRawDataObjectPrefix() { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); return GenFieldRawDataPathPrefix( local_chunk_manager, field_meta_.segment_id, field_meta_.field_id); } bool DiskFileManagerImpl::RemoveFile(const std::string& file) noexcept { // TODO: implement this interface return false; } std::optional DiskFileManagerImpl::IsExisted(const std::string& file) noexcept { bool isExist = false; auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); try { isExist = local_chunk_manager->Exist(file); } catch (std::exception& e) { // LOG_DEBUG("Exception:{}", e).what(); return std::nullopt; } return isExist; } template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); } // namespace milvus::storage