diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 22904bed2d..1847e30017 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -898,24 +898,22 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; - scheduler::BuildIndexJobPtr - job = std::make_shared(0, meta_ptr_); - // step 2: put build index task to scheduler // for (auto &file : to_index_files) { -// std::cout << "get to index file" << std::endl; +// scheduler::BuildIndexJobPtr +// job = std::make_shared(0, meta_ptr_, options_); // // scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); -// job->AddToIndexFiles(file_ptr); // +// job->AddToIndexFiles(file_ptr); +// scheduler::JobMgrInst::GetInstance()->Put(job); +// job->WaitBuildIndexFinish(); // if (!job->GetStatus().ok()) { // Status status = job->GetStatus(); // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); // } -// // } -// scheduler::JobMgrInst::GetInstance()->Put(job); -// job->WaitBuildIndexFinish(); + for (auto &file : to_index_files) { std::cout << "get to index file" << std::endl; diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index 1572b967d1..848704bd4b 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -66,6 +66,9 @@ class ExecutionEngine { virtual Status CopyToGpu(uint64_t device_id) = 0; + virtual Status + CopyToIndexFileToGpu(uint64_t device_id) = 0; + virtual Status CopyToCpu() = 0; diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index f9366e439a..a6f82d21c1 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -133,7 +133,6 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { - std::cout << "load" << std::endl; index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { @@ -162,7 +161,7 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { - std::cout << "copy2gpu" << std::endl; + std::cout << "copytogpu" << std::endl; auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { @@ -189,6 +188,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { return Status::OK(); } +Status +ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { + auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); + bool already_in_cache = (index != nullptr); + if (!already_in_cache) { + cache::DataObjPtr obj = std::make_shared(nullptr, PhysicalSize()); + milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj); + } + return Status::OK(); +} + Status ExecutionEngineImpl::CopyToCpu() { auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 62a7b29a63..5793763fdd 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine { Status CopyToGpu(uint64_t device_id) override; + Status + CopyToIndexFileToGpu(uint64_t device_id) override; + Status CopyToCpu() override; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 127e01232c..828f0c71c6 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -22,13 +22,14 @@ #include "src/cache/GpuCacheMgr.h" #include "src/server/Config.h" + namespace milvus { namespace scheduler { std::vector -get_neighbours(const ResourcePtr& self) { +get_neighbours(const ResourcePtr &self) { std::vector neighbours; - for (auto& neighbour_node : self->GetNeighbours()) { + for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -42,9 +43,9 @@ get_neighbours(const ResourcePtr& self) { } std::vector> -get_neighbours_with_connetion(const ResourcePtr& self) { +get_neighbours_with_connetion(const ResourcePtr &self) { std::vector> neighbours; - for (auto& neighbour_node : self->GetNeighbours()) { + for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -58,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) { } void -Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { std::vector speeds; uint64_t total_speed = 0; - for (auto& neighbour : neighbours) { + for (auto &neighbour : neighbours) { uint64_t speed = neighbour.second.speed(); speeds.emplace_back(speed); total_speed += speed; @@ -88,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self } void -Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { auto neighbours = get_neighbours(self); - for (auto& neighbour : neighbours) { + for (auto &neighbour : neighbours) { neighbour->task_table().Put(task); } } void -Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { +Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) { dest->task_table().Put(task); } @@ -138,7 +139,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr auto compute_resources = res_mgr.lock()->GetComputeResources(); std::vector> paths; std::vector transport_costs; - for (auto& res : compute_resources) { + for (auto &res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); transport_costs.push_back(transport_cost); @@ -171,14 +172,21 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr int32_t build_index_gpu; Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); + bool find_gpu_res = false; for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->name() - == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { - Path task_path(paths[i], paths[i].size() - 1); - task->path() = task_path; - break; + if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { + if (compute_resources[i]->name() + == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + find_gpu_res = true; + Path task_path(paths[i], paths[i].size() - 1); + task->path() = task_path; + break; + } } } + if (not find_gpu_res) { + task->path() = Path(paths[0], paths[0].size() - 1); + } } } diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index 9ab3650dba..9dc21f9bec 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -22,8 +22,8 @@ namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr) - : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) { +BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options) + : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { } diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index e536012985..e4fa7b1fd8 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -42,7 +42,7 @@ using Id2ToTableFileMap = std::unordered_map; class BuildIndexJob : public Job { public: - explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr); + explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options); public: bool @@ -55,16 +55,6 @@ class BuildIndexJob : public Job { BuildIndexDone(size_t to_index_id); public: -// std::string -// location() const { -// return location_; -// } -// -// EngineType -// engine_type() const { -// return engine_type_; -// } - Status & GetStatus() { return status_; @@ -80,9 +70,15 @@ class BuildIndexJob : public Job { return meta_ptr_; } + engine::DBOptions + options() const { + return options_; + } + private: Id2ToIndexMap to_index_files_; engine::meta::MetaPtr meta_ptr_; + engine::DBOptions options_; Status status_; std::mutex mutex_; diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index ea775982ee..667db2cbae 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -44,53 +44,57 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { std::string error_msg; std::string type_str; - try { - if (type == LoadType::DISK2CPU) { - stat = to_index_engine_->Load(); - type_str = "DISK2CPU"; - } else if (type == LoadType::CPU2GPU) { -// stat = to_index_engine_->CopyToGpu(device_id); - type_str = "CPU2GPU"; - } else if (type == LoadType::GPU2CPU) { - stat = to_index_engine_->CopyToCpu(); - type_str = "GPU2CPU"; - } else { - error_msg = "Wrong load type"; + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + auto options = build_index_job->options(); + try { + if (type == LoadType::DISK2CPU) { + stat = to_index_engine_->Load(options.insert_cache_immediately_); + type_str = "DISK2CPU"; + } else if (type == LoadType::CPU2GPU) { + stat = to_index_engine_->CopyToIndexFileToGpu(device_id); + type_str = "CPU2GPU"; + } else if (type == LoadType::GPU2CPU) { + stat = to_index_engine_->CopyToCpu(); + type_str = "GPU2CPU"; + } else { + error_msg = "Wrong load type"; + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + } catch (std::exception& ex) { + // typical error: out of disk space or permition denied + error_msg = "Failed to load to_index file: " + std::string(ex.what()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - } catch (std::exception& ex) { - // typical error: out of disk space or permition denied - error_msg = "Failed to load to_index file: " + std::string(ex.what()); - stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); - } - if (!stat.ok()) { - Status s; - if(stat.ToString().find("out of memory") != std::string::npos) { - error_msg = "out of memory: " + type_str; - s = Status(SERVER_UNEXPECTED_ERROR, error_msg); - } else { - error_msg = "Failed to load to_index file: " + type_str; - s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + if (!stat.ok()) { + Status s; + if(stat.ToString().find("out of memory") != std::string::npos) { + error_msg = "out of memory: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } else { + error_msg = "Failed to load to_index file: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + build_index_job->BuildIndexDone(file_->id_); + } + + return; } - if (auto job = job_.lock()) { - auto build_index_job = std::static_pointer_cast(job); - build_index_job->BuildIndexDone(file_->id_); - } + size_t file_size = to_index_engine_->PhysicalSize(); - return; + std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + " bytes from location: " + file_->location_ + " totally cost"; + double span = rc.ElapseFromBegin(info); + + to_index_id_ = file_->id_; + to_index_type_ = file_->file_type_; } - - size_t file_size = to_index_engine_->PhysicalSize(); - - std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + - std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + - " bytes from location: " + file_->location_ + " totally cost"; - double span = rc.ElapseFromBegin(info); - - to_index_id_ = file_->id_; - to_index_type_ = file_->file_type_; } void @@ -119,12 +123,13 @@ XBuildIndexTask::Execute() { if (!status.ok()) { ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); build_index_job->BuildIndexDone(to_index_id_); - //TODO: return status + build_index_job->GetStatus() = status; + return; } // step 3: build index try { - index = to_index_engine_->BuildIndex(location, engine_type); + index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); if (index == nullptr) { table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; status = meta_ptr->UpdateTableFile(table_file); @@ -153,7 +158,7 @@ XBuildIndexTask::Execute() { meta_ptr->HasTable(file_->table_id_, has_table); if (!has_table) { meta_ptr->DeleteTableFiles(file_->table_id_); -// return Status::OK(); + return; } // step 5: save index file @@ -171,7 +176,8 @@ XBuildIndexTask::Execute() { std::cout << "ERROR: failed to persist index file: " << table_file.location_ << ", possible out of disk space" << std::endl; -// return Status(DB_ERROR, msg); + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; } // step 6: update meta