mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: convert the GetObject util to async (#30166)
This makes it much easier to use Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
0ac37a4a0c
commit
a77693aa19
@ -26,10 +26,10 @@
|
||||
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/FileManager.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/IndexData.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "storage/ThreadPools.h"
|
||||
#include "storage/Util.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
@ -81,20 +81,16 @@ DiskFileManagerImpl::AddFileUsingSpace(
|
||||
const std::vector<int64_t>& remote_file_sizes) {
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto LoadIndexFromDisk = [&](
|
||||
const std::string& file,
|
||||
const int64_t offset,
|
||||
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
|
||||
local_chunk_manager->Read(file, offset, buf.get(), data_size);
|
||||
return buf;
|
||||
};
|
||||
|
||||
for (int64_t i = 0; i < remote_files.size(); ++i) {
|
||||
auto data = LoadIndexFromDisk(
|
||||
local_file_name, local_file_offsets[i], remote_file_sizes[i]);
|
||||
auto status = space_->WriteBlob(
|
||||
remote_files[i], data.get(), remote_file_sizes[i]);
|
||||
auto buf =
|
||||
std::shared_ptr<uint8_t[]>(new uint8_t[remote_file_sizes[i]]);
|
||||
local_chunk_manager->Read(local_file_name,
|
||||
local_file_offsets[i],
|
||||
buf.get(),
|
||||
remote_file_sizes[i]);
|
||||
|
||||
auto status =
|
||||
space_->WriteBlob(remote_files[i], buf.get(), remote_file_sizes[i]);
|
||||
if (!status.ok()) {
|
||||
return false;
|
||||
}
|
||||
@ -162,16 +158,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
||||
const std::vector<int64_t>& remote_file_sizes) {
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
||||
|
||||
auto LoadIndexFromDisk = [&](
|
||||
const std::string& file,
|
||||
const int64_t offset,
|
||||
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
|
||||
local_chunk_manager->Read(file, offset, buf.get(), data_size);
|
||||
return buf;
|
||||
};
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
|
||||
|
||||
std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
|
||||
AssertInfo(local_file_offsets.size() == remote_files.size(),
|
||||
@ -180,10 +167,17 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
||||
"inconsistent size of file slices with size slices");
|
||||
|
||||
for (int64_t i = 0; i < remote_files.size(); ++i) {
|
||||
futures.push_back(pool.Submit(LoadIndexFromDisk,
|
||||
local_file_name,
|
||||
local_file_offsets[i],
|
||||
remote_file_sizes[i]));
|
||||
futures.push_back(pool.Submit(
|
||||
[&](const std::string& file,
|
||||
const int64_t offset,
|
||||
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
|
||||
local_chunk_manager->Read(file, offset, buf.get(), data_size);
|
||||
return buf;
|
||||
},
|
||||
local_file_name,
|
||||
local_file_offsets[i],
|
||||
remote_file_sizes[i]));
|
||||
}
|
||||
|
||||
// hold index data util upload index file done
|
||||
@ -211,8 +205,8 @@ DiskFileManagerImpl::AddBatchIndexFiles(
|
||||
field_meta_,
|
||||
index_meta_);
|
||||
}
|
||||
for (auto iter = res.begin(); iter != res.end(); ++iter) {
|
||||
remote_paths_to_size_[iter->first] = iter->second;
|
||||
for (auto& re : res) {
|
||||
remote_paths_to_size_[re.first] = re.second;
|
||||
}
|
||||
}
|
||||
|
||||
@ -344,7 +338,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(
|
||||
|
||||
uint64_t offset = local_file_init_offfset;
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
auto index_data = index_datas[i];
|
||||
auto index_data = index_datas[i].get()->GetFieldData();
|
||||
auto index_size = index_data->Size();
|
||||
auto uint8_data =
|
||||
reinterpret_cast<uint8_t*>(const_cast<void*>(index_data->Data()));
|
||||
@ -473,7 +467,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
||||
auto field_datas = GetObjectData(rcm_.get(), batch_files);
|
||||
int batch_size = batch_files.size();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
auto field_data = field_datas[i];
|
||||
auto field_data = field_datas[i].get()->GetFieldData();
|
||||
num_rows += uint32_t(field_data->get_num_rows());
|
||||
AssertInfo(dim == 0 || dim == field_data->get_dim(),
|
||||
"inconsistent dim value in multi binlogs!");
|
||||
|
||||
@ -153,7 +153,8 @@ MemFileManagerImpl::LoadIndexToMemory(
|
||||
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
|
||||
auto file_name =
|
||||
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
|
||||
file_to_index_data[file_name] = index_datas[idx];
|
||||
file_to_index_data[file_name] =
|
||||
index_datas[idx].get()->GetFieldData();
|
||||
}
|
||||
};
|
||||
|
||||
@ -192,7 +193,7 @@ MemFileManagerImpl::CacheRawDataToMemory(
|
||||
auto FetchRawData = [&]() {
|
||||
auto raw_datas = GetObjectData(rcm_.get(), batch_files);
|
||||
for (auto& data : raw_datas) {
|
||||
field_datas.emplace_back(data);
|
||||
field_datas.emplace_back(data.get()->GetFieldData());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -504,23 +504,17 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
|
||||
return std::make_pair(std::move(object_key), serialized_index_size);
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>>
|
||||
GetObjectData(ChunkManager* remote_chunk_manager,
|
||||
const std::vector<std::string>& remote_files) {
|
||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
|
||||
futures.reserve(remote_files.size());
|
||||
for (auto& file : remote_files) {
|
||||
futures.emplace_back(pool.Submit(
|
||||
DownloadAndDecodeRemoteFile, remote_chunk_manager, file));
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr> datas;
|
||||
for (int i = 0; i < futures.size(); ++i) {
|
||||
auto res = futures[i].get();
|
||||
datas.emplace_back(res->GetFieldData());
|
||||
}
|
||||
ReleaseArrowUnused();
|
||||
return datas;
|
||||
return futures;
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <future>
|
||||
|
||||
#include "common/FieldData.h"
|
||||
#include "common/LoadInfo.h"
|
||||
@ -115,7 +116,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
|
||||
const FieldMeta& field_meta,
|
||||
std::string object_key);
|
||||
|
||||
std::vector<FieldDataPtr>
|
||||
std::vector<std::future<std::unique_ptr<DataCodec>>>
|
||||
GetObjectData(ChunkManager* remote_chunk_manager,
|
||||
const std::vector<std::string>& remote_files);
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user