From a77693aa1979fb4b9ec7a5aae2a0c1e443b73d13 Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 22 Jan 2024 19:20:57 +0800 Subject: [PATCH] enhance: convert the `GetObject` util to async (#30166) This makes it much easier to use Signed-off-by: yah01 --- .../core/src/storage/DiskFileManagerImpl.cpp | 60 +++++++++---------- .../core/src/storage/MemFileManagerImpl.cpp | 5 +- internal/core/src/storage/Util.cpp | 12 +--- internal/core/src/storage/Util.h | 3 +- 4 files changed, 35 insertions(+), 45 deletions(-) diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 060c2110ee..2ee03543f3 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -26,10 +26,10 @@ #include "storage/DiskFileManagerImpl.h" #include "storage/FileManager.h" -#include "storage/LocalChunkManagerSingleton.h" #include "storage/IndexData.h" -#include "storage/Util.h" +#include "storage/LocalChunkManagerSingleton.h" #include "storage/ThreadPools.h" +#include "storage/Util.h" namespace milvus::storage { @@ -81,20 +81,16 @@ DiskFileManagerImpl::AddFileUsingSpace( const std::vector& remote_file_sizes) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - auto LoadIndexFromDisk = [&]( - const std::string& file, - const int64_t offset, - const int64_t data_size) -> std::shared_ptr { - auto buf = std::shared_ptr(new uint8_t[data_size]); - local_chunk_manager->Read(file, offset, buf.get(), data_size); - return buf; - }; - for (int64_t i = 0; i < remote_files.size(); ++i) { - auto data = LoadIndexFromDisk( - local_file_name, local_file_offsets[i], remote_file_sizes[i]); - auto status = space_->WriteBlob( - remote_files[i], data.get(), remote_file_sizes[i]); + auto buf = + std::shared_ptr(new uint8_t[remote_file_sizes[i]]); + local_chunk_manager->Read(local_file_name, + local_file_offsets[i], + buf.get(), + remote_file_sizes[i]); + + auto status = + space_->WriteBlob(remote_files[i], buf.get(), remote_file_sizes[i]); if (!status.ok()) { return false; } @@ -162,16 +158,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( const std::vector& remote_file_sizes) { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); - - auto LoadIndexFromDisk = [&]( - const std::string& file, - const int64_t offset, - const int64_t data_size) -> std::shared_ptr { - auto buf = std::shared_ptr(new uint8_t[data_size]); - local_chunk_manager->Read(file, offset, buf.get(), data_size); - return buf; - }; + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; AssertInfo(local_file_offsets.size() == remote_files.size(), @@ -180,10 +167,17 @@ DiskFileManagerImpl::AddBatchIndexFiles( "inconsistent size of file slices with size slices"); for (int64_t i = 0; i < remote_files.size(); ++i) { - futures.push_back(pool.Submit(LoadIndexFromDisk, - local_file_name, - local_file_offsets[i], - remote_file_sizes[i])); + futures.push_back(pool.Submit( + [&](const std::string& file, + const int64_t offset, + const int64_t data_size) -> std::shared_ptr { + auto buf = std::shared_ptr(new uint8_t[data_size]); + local_chunk_manager->Read(file, offset, buf.get(), data_size); + return buf; + }, + local_file_name, + local_file_offsets[i], + remote_file_sizes[i])); } // hold index data util upload index file done @@ -211,8 +205,8 @@ DiskFileManagerImpl::AddBatchIndexFiles( field_meta_, index_meta_); } - for (auto iter = res.begin(); iter != res.end(); ++iter) { - remote_paths_to_size_[iter->first] = iter->second; + for (auto& re : res) { + remote_paths_to_size_[re.first] = re.second; } } @@ -344,7 +338,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk( uint64_t offset = local_file_init_offfset; for (int i = 0; i < batch_size; ++i) { - auto index_data = index_datas[i]; + auto index_data = index_datas[i].get()->GetFieldData(); auto index_size = index_data->Size(); auto uint8_data = reinterpret_cast(const_cast(index_data->Data())); @@ -473,7 +467,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { auto field_datas = GetObjectData(rcm_.get(), batch_files); int batch_size = batch_files.size(); for (int i = 0; i < batch_size; ++i) { - auto field_data = field_datas[i]; + auto field_data = field_datas[i].get()->GetFieldData(); num_rows += uint32_t(field_data->get_num_rows()); AssertInfo(dim == 0 || dim == field_data->get_dim(), "inconsistent dim value in multi binlogs!"); diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index 13639d6963..80bc90bb2e 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -153,7 +153,8 @@ MemFileManagerImpl::LoadIndexToMemory( for (size_t idx = 0; idx < batch_files.size(); ++idx) { auto file_name = batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1); - file_to_index_data[file_name] = index_datas[idx]; + file_to_index_data[file_name] = + index_datas[idx].get()->GetFieldData(); } }; @@ -192,7 +193,7 @@ MemFileManagerImpl::CacheRawDataToMemory( auto FetchRawData = [&]() { auto raw_datas = GetObjectData(rcm_.get(), batch_files); for (auto& data : raw_datas) { - field_datas.emplace_back(data); + field_datas.emplace_back(data.get()->GetFieldData()); } }; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index c725d80297..6038e55e0e 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -504,23 +504,17 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, return std::make_pair(std::move(object_key), serialized_index_size); } -std::vector +std::vector>> GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; + futures.reserve(remote_files.size()); for (auto& file : remote_files) { futures.emplace_back(pool.Submit( DownloadAndDecodeRemoteFile, remote_chunk_manager, file)); } - - std::vector datas; - for (int i = 0; i < futures.size(); ++i) { - auto res = futures[i].get(); - datas.emplace_back(res->GetFieldData()); - } - ReleaseArrowUnused(); - return datas; + return futures; } std::vector diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index fbd5fcb541..acb6d233c0 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "common/FieldData.h" #include "common/LoadInfo.h" @@ -115,7 +116,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, const FieldMeta& field_meta, std::string object_key); -std::vector +std::vector>> GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files);