MS-603 Add BuildIndex to scheduler

Former-commit-id: f962874aac6dd92b4cdf121ca8a645185bc5882f
This commit is contained in:
Yu Kun 2019-10-08 16:05:57 +08:00
parent bc10937bcf
commit bdbc1375d7
8 changed files with 106 additions and 82 deletions

View File

@ -898,24 +898,22 @@ DBImpl::BackgroundBuildIndex() {
meta_ptr_->FilesToIndex(to_index_files);
Status status;
scheduler::BuildIndexJobPtr
job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_);
// step 2: put build index task to scheduler
// for (auto &file : to_index_files) {
// std::cout << "get to index file" << std::endl;
// scheduler::BuildIndexJobPtr
// job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
//
// scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
// job->AddToIndexFiles(file_ptr);
//
// job->AddToIndexFiles(file_ptr);
// scheduler::JobMgrInst::GetInstance()->Put(job);
// job->WaitBuildIndexFinish();
// if (!job->GetStatus().ok()) {
// Status status = job->GetStatus();
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// }
//
// }
// scheduler::JobMgrInst::GetInstance()->Put(job);
// job->WaitBuildIndexFinish();
for (auto &file : to_index_files) {
std::cout << "get to index file" << std::endl;

View File

@ -66,6 +66,9 @@ class ExecutionEngine {
virtual Status
CopyToGpu(uint64_t device_id) = 0;
virtual Status
CopyToIndexFileToGpu(uint64_t device_id) = 0;
virtual Status
CopyToCpu() = 0;

View File

@ -133,7 +133,6 @@ ExecutionEngineImpl::Serialize() {
Status
ExecutionEngineImpl::Load(bool to_cache) {
std::cout << "load" << std::endl;
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
@ -162,7 +161,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
std::cout << "copy2gpu" << std::endl;
std::cout << "copytogpu" << std::endl;
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
@ -189,6 +188,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (!already_in_cache) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(nullptr, PhysicalSize());
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
}
return Status::OK();
}
Status
ExecutionEngineImpl::CopyToCpu() {
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);

View File

@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status
CopyToGpu(uint64_t device_id) override;
Status
CopyToIndexFileToGpu(uint64_t device_id) override;
Status
CopyToCpu() override;

View File

@ -22,13 +22,14 @@
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"
namespace milvus {
namespace scheduler {
std::vector<ResourcePtr>
get_neighbours(const ResourcePtr& self) {
get_neighbours(const ResourcePtr &self) {
std::vector<ResourcePtr> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node)
continue;
@ -42,9 +43,9 @@ get_neighbours(const ResourcePtr& self) {
}
std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr& self) {
get_neighbours_with_connetion(const ResourcePtr &self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node)
continue;
@ -58,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) {
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t> speeds;
uint64_t total_speed = 0;
for (auto& neighbour : neighbours) {
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed);
total_speed += speed;
@ -88,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
void
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
for (auto& neighbour : neighbours) {
for (auto &neighbour : neighbours) {
neighbour->task_table().Put(task);
}
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) {
dest->task_table().Put(task);
}
@ -138,7 +139,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
auto compute_resources = res_mgr.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
transport_costs.push_back(transport_cost);
@ -171,14 +172,21 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
int32_t build_index_gpu;
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
bool find_gpu_res = false;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name()
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
if (compute_resources[i]->name()
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
break;
}
}
}
if (not find_gpu_res) {
task->path() = Path(paths[0], paths[0].size() - 1);
}
}
}

View File

@ -22,8 +22,8 @@
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
}

View File

@ -42,7 +42,7 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr);
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
public:
bool
@ -55,16 +55,6 @@ class BuildIndexJob : public Job {
BuildIndexDone(size_t to_index_id);
public:
// std::string
// location() const {
// return location_;
// }
//
// EngineType
// engine_type() const {
// return engine_type_;
// }
Status &
GetStatus() {
return status_;
@ -80,9 +70,15 @@ class BuildIndexJob : public Job {
return meta_ptr_;
}
engine::DBOptions
options() const {
return options_;
}
private:
Id2ToIndexMap to_index_files_;
engine::meta::MetaPtr meta_ptr_;
engine::DBOptions options_;
Status status_;
std::mutex mutex_;

View File

@ -44,53 +44,57 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
std::string error_msg;
std::string type_str;
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load();
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
// stat = to_index_engine_->CopyToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = to_index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
auto options = build_index_job->options();
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load(options.insert_cache_immediately_);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToIndexFileToGpu(device_id);
type_str = "CPU2GPU";
} else if (type == LoadType::GPU2CPU) {
stat = to_index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (!stat.ok()) {
Status s;
if(stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} else {
error_msg = "Failed to load to_index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
if (!stat.ok()) {
Status s;
if(stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} else {
error_msg = "Failed to load to_index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
build_index_job->BuildIndexDone(file_->id_);
}
return;
}
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
build_index_job->BuildIndexDone(file_->id_);
}
size_t file_size = to_index_engine_->PhysicalSize();
return;
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" +
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
to_index_id_ = file_->id_;
to_index_type_ = file_->file_type_;
}
size_t file_size = to_index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" +
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
to_index_id_ = file_->id_;
to_index_type_ = file_->file_type_;
}
void
@ -119,12 +123,13 @@ XBuildIndexTask::Execute() {
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
build_index_job->BuildIndexDone(to_index_id_);
//TODO: return status
build_index_job->GetStatus() = status;
return;
}
// step 3: build index
try {
index = to_index_engine_->BuildIndex(location, engine_type);
index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_);
if (index == nullptr) {
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
@ -153,7 +158,7 @@ XBuildIndexTask::Execute() {
meta_ptr->HasTable(file_->table_id_, has_table);
if (!has_table) {
meta_ptr->DeleteTableFiles(file_->table_id_);
// return Status::OK();
return;
}
// step 5: save index file
@ -171,7 +176,8 @@ XBuildIndexTask::Execute() {
std::cout << "ERROR: failed to persist index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
// return Status(DB_ERROR, msg);
build_index_job->GetStatus() = Status(DB_ERROR, msg);
return;
}
// step 6: update meta