From d89aa2020c947fd7ea336f871532d1b3fb06f00f Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 27 Sep 2019 14:53:27 +0800 Subject: [PATCH 1/8] add showtables unittest Former-commit-id: a32787a331c94245e1cf87ad43e5e365120cb38e --- cpp/unittest/server/rpc_test.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/unittest/server/rpc_test.cpp b/cpp/unittest/server/rpc_test.cpp index 7895d081a1..6b27db2d48 100644 --- a/cpp/unittest/server/rpc_test.cpp +++ b/cpp/unittest/server/rpc_test.cpp @@ -349,11 +349,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) { handler->Insert(&context, &request, &vector_ids); -//Show table -// ::milvus::grpc::Command cmd; -// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer; -// status = handler->ShowTables(&context, &cmd, writer); -// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + //show tables + ::milvus::grpc::Command cmd; + ::milvus::grpc::TableNameList table_name_list; + status = handler->ShowTables(&context, &cmd, &table_name_list); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); //Count Table ::milvus::grpc::TableRowCount count; From b815cc36fe5547337db7fd9de822530cff280eac Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Sun, 29 Sep 2019 16:43:03 +0800 Subject: [PATCH 2/8] complete BuildIndexJob and BuildIndexTask Former-commit-id: 3b5d96f4800860b7b3fb09fa3d64b9b3f41441b8 --- cpp/src/db/DBImpl.cpp | 155 ++++++++++-------- cpp/src/scheduler/TaskCreator.cpp | 29 +++- cpp/src/scheduler/TaskCreator.h | 4 + .../scheduler/action/PushTaskToNeighbour.cpp | 50 ++++-- cpp/src/scheduler/job/BuildIndexJob.cpp | 63 +++++++ cpp/src/scheduler/job/BuildIndexJob.h | 86 ++++++++++ cpp/src/scheduler/task/BuildIndexTask.cpp | 131 +++++++++++++++ cpp/src/scheduler/task/BuildIndexTask.h | 49 ++++++ cpp/src/scheduler/task/Task.h | 1 + cpp/src/scheduler/tasklabel/SpecResLabel.h | 7 +- 10 files changed, 486 insertions(+), 89 deletions(-) create mode 100644 cpp/src/scheduler/job/BuildIndexJob.cpp create mode 100644 cpp/src/scheduler/job/BuildIndexJob.h create mode 100644 cpp/src/scheduler/task/BuildIndexTask.cpp create mode 100644 cpp/src/scheduler/task/BuildIndexTask.h diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 2780d0f763..0acef32adc 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -28,6 +28,7 @@ #include "scheduler/SchedInst.h" #include "scheduler/job/DeleteJob.h" #include "scheduler/job/SearchJob.h" +#include "scheduler/job/BuildIndexJob.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -39,6 +40,7 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { @@ -51,7 +53,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; } // namespace -DBImpl::DBImpl(const DBOptions& options) +DBImpl::DBImpl(const DBOptions &options) : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); @@ -111,7 +113,7 @@ DBImpl::DropAll() { } Status -DBImpl::CreateTable(meta::TableSchema& table_schema) { +DBImpl::CreateTable(meta::TableSchema &table_schema) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -122,7 +124,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) { } Status -DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { +DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -147,7 +149,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { } Status -DBImpl::DescribeTable(meta::TableSchema& table_schema) { +DBImpl::DescribeTable(meta::TableSchema &table_schema) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -158,7 +160,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) { } Status -DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { +DBImpl::HasTable(const std::string &table_id, bool &has_or_not) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -167,7 +169,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { } Status -DBImpl::AllTables(std::vector& table_schema_array) { +DBImpl::AllTables(std::vector &table_schema_array) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -176,7 +178,7 @@ DBImpl::AllTables(std::vector& table_schema_array) { } Status -DBImpl::PreloadTable(const std::string& table_id) { +DBImpl::PreloadTable(const std::string &table_id) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -195,11 +197,11 @@ DBImpl::PreloadTable(const std::string& table_id) { int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); int64_t available_size = cache_total - cache_usage; - for (auto& day_files : files) { - for (auto& file : day_files.second) { + for (auto &day_files : files) { + for (auto &file : day_files.second) { ExecutionEnginePtr engine = - EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, - (MetricType)file.metric_type_, file.nlist_); + EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, + (MetricType) file.metric_type_, file.nlist_); if (engine == nullptr) { ENGINE_LOG_ERROR << "Invalid engine type"; return Status(DB_ERROR, "Invalid engine type"); @@ -212,7 +214,7 @@ DBImpl::PreloadTable(const std::string& table_id) { try { // step 1: load index engine->Load(true); - } catch (std::exception& ex) { + } catch (std::exception &ex) { std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status(DB_ERROR, msg); @@ -224,7 +226,7 @@ DBImpl::PreloadTable(const std::string& table_id) { } Status -DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { +DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -233,7 +235,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { } Status -DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { +DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -242,7 +244,7 @@ DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { } Status -DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vectors, IDNumbers& vector_ids_) { +DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) { // ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache"; if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); @@ -261,7 +263,7 @@ DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vec } Status -DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { +DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { { std::unique_lock lock(build_index_mutex_); @@ -295,15 +297,15 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector file_types; - if (index.engine_type_ == (int)EngineType::FAISS_IDMAP) { + if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) { file_types = { - (int)meta::TableFileSchema::NEW, (int)meta::TableFileSchema::NEW_MERGE, + (int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE, }; } else { file_types = { - (int)meta::TableFileSchema::RAW, (int)meta::TableFileSchema::NEW, - (int)meta::TableFileSchema::NEW_MERGE, (int)meta::TableFileSchema::NEW_INDEX, - (int)meta::TableFileSchema::TO_INDEX, + (int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW, + (int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX, + (int) meta::TableFileSchema::TO_INDEX, }; } @@ -313,7 +315,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { while (!file_ids.empty()) { ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times; - if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) { + if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) { status = meta_ptr_->UpdateTableFilesToIndex(table_id); } @@ -326,19 +328,19 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { } Status -DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { +DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) { return meta_ptr_->DescribeTableIndex(table_id, index); } Status -DBImpl::DropIndex(const std::string& table_id) { +DBImpl::DropIndex(const std::string &table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; return meta_ptr_->DropTableIndex(table_id); } Status -DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, - QueryResults& results) { +DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, + QueryResults &results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -350,8 +352,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr } Status -DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, - const meta::DatesT& dates, QueryResults& results) { +DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, + const meta::DatesT &dates, QueryResults &results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -367,8 +369,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr } meta::TableFilesSchema file_id_array; - for (auto& day_files : files) { - for (auto& file : day_files.second) { + for (auto &day_files : files) { + for (auto &file : day_files.second) { file_id_array.push_back(file); } } @@ -380,8 +382,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr } Status -DBImpl::Query(const std::string& table_id, const std::vector& file_ids, uint64_t k, uint64_t nq, - uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { +DBImpl::Query(const std::string &table_id, const std::vector &file_ids, uint64_t k, uint64_t nq, + uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -390,7 +392,7 @@ DBImpl::Query(const std::string& table_id, const std::vector& file_ // get specified files std::vector ids; - for (auto& id : file_ids) { + for (auto &id : file_ids) { meta::TableFileSchema table_file; table_file.table_id_ = table_id; std::string::size_type sz; @@ -404,8 +406,8 @@ DBImpl::Query(const std::string& table_id, const std::vector& file_ } meta::TableFilesSchema file_id_array; - for (auto& day_files : files_array) { - for (auto& file : day_files.second) { + for (auto &day_files : files_array) { + for (auto &file : day_files.second) { file_id_array.push_back(file); } } @@ -421,7 +423,7 @@ DBImpl::Query(const std::string& table_id, const std::vector& file_ } Status -DBImpl::Size(uint64_t& result) { +DBImpl::Size(uint64_t &result) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -433,8 +435,8 @@ DBImpl::Size(uint64_t& result) { // internal methods /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// Status -DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, - uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { +DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, uint64_t k, uint64_t nq, + uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { server::CollectQueryMetrics metrics(nq); TimeRecorder rc(""); @@ -443,7 +445,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size(); scheduler::SearchJobPtr job = std::make_shared(0, k, nq, nprobe, vectors); - for (auto& file : files) { + for (auto &file : files) { scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); job->AddIndexFile(file_ptr); } @@ -511,7 +513,7 @@ DBImpl::BackgroundTimerTask() { void DBImpl::WaitMergeFileFinish() { std::lock_guard lck(compact_result_mutex_); - for (auto& iter : compact_thread_results_) { + for (auto &iter : compact_thread_results_) { iter.wait(); } } @@ -519,7 +521,7 @@ DBImpl::WaitMergeFileFinish() { void DBImpl::WaitBuildIndexFinish() { std::lock_guard lck(index_result_mutex_); - for (auto& iter : index_thread_results_) { + for (auto &iter : index_thread_results_) { iter.wait(); } } @@ -560,7 +562,7 @@ DBImpl::MemSerialize() { std::lock_guard lck(mem_serialize_mutex_); std::set temp_table_ids; mem_mgr_->Serialize(temp_table_ids); - for (auto& id : temp_table_ids) { + for (auto &id : temp_table_ids) { compact_table_ids_.insert(id); } @@ -605,7 +607,7 @@ DBImpl::StartCompactionTask() { } Status -DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) { +DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const meta::TableFilesSchema &files) { ENGINE_LOG_DEBUG << "Merge files for table: " << table_id; // step 1: create table file @@ -622,13 +624,13 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 2: merge files ExecutionEnginePtr index = - EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, - (MetricType)table_file.metric_type_, table_file.nlist_); + EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_, + (MetricType) table_file.metric_type_, table_file.nlist_); meta::TableFilesSchema updated; int64_t index_size = 0; - for (auto& file : files) { + for (auto &file : files) { server::CollectMergeFilesMetrics metrics; index->Merge(file.location_); @@ -644,7 +646,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 3: serialize to disk try { index->Serialize(); - } catch (std::exception& ex) { + } catch (std::exception &ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -662,7 +664,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 4: update table files state // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size // else set file type to RAW, no need to build index - if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) { + if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) { table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; } else { @@ -682,7 +684,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m } Status -DBImpl::BackgroundMergeFiles(const std::string& table_id) { +DBImpl::BackgroundMergeFiles(const std::string &table_id) { meta::DatePartionedTableFilesSchema raw_files; auto status = meta_ptr_->FilesToMerge(table_id, raw_files); if (!status.ok()) { @@ -691,7 +693,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { } bool has_merge = false; - for (auto& kv : raw_files) { + for (auto &kv : raw_files) { auto files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action"; @@ -714,7 +716,7 @@ DBImpl::BackgroundCompaction(std::set table_ids) { ENGINE_LOG_TRACE << " Background compaction thread start"; Status status; - for (auto& table_id : table_ids) { + for (auto &table_id : table_ids) { status = BackgroundMergeFiles(table_id); if (!status.ok()) { ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString(); @@ -766,9 +768,9 @@ DBImpl::StartBuildIndexTask(bool force) { } Status -DBImpl::BuildIndex(const meta::TableFileSchema& file) { - ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, - (MetricType)file.metric_type_, file.nlist_); +DBImpl::BuildIndex(const meta::TableFileSchema &file) { + ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, + (MetricType) file.metric_type_, file.nlist_); if (to_index == nullptr) { ENGINE_LOG_ERROR << "Invalid engine type"; return Status(DB_ERROR, "Invalid engine type"); @@ -799,7 +801,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) { try { server::CollectBuildIndexMetrics metrics; - index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); + index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); if (index == nullptr) { table_file.file_type_ = meta::TableFileSchema::TO_DELETE; status = meta_ptr_->UpdateTableFile(table_file); @@ -808,7 +810,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) { return status; } - } catch (std::exception& ex) { + } catch (std::exception &ex) { // typical error: out of gpu memory std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -834,7 +836,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) { // step 5: save index file try { index->Serialize(); - } catch (std::exception& ex) { + } catch (std::exception &ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -877,7 +879,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) { status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; } - } catch (std::exception& ex) { + } catch (std::exception &ex) { std::string msg = "Build index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status(DB_ERROR, msg); @@ -894,17 +896,40 @@ DBImpl::BackgroundBuildIndex() { meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status; - for (auto& file : to_index_files) { - status = BuildIndex(file); + + scheduler::BuildIndexJobPtr + job = std::make_shared(0); + + // step 2: put build index task to scheduler + scheduler::JobMgrInst::GetInstance()->Put(job); + for (auto &file : to_index_files) { + std::cout << "get to index file" << std::endl; + meta::TableFileSchema table_file; + table_file.table_id_ = file.table_id_; + table_file.date_ = file.date_; + table_file.file_type_ = + meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path + status = meta_ptr_->CreateTableFile(table_file); if (!status.ok()) { - ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); + ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); } - if (shutting_down_.load(std::memory_order_acquire)) { - ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; - break; - } + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + job->AddToIndexFiles(file_ptr, table_file); } + job->WaitBuildIndexFinish(); + +// for (auto &file : to_index_files) { +// status = BuildIndex(file); +// if (!status.ok()) { +// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); +// } +// +// if (shutting_down_.load(std::memory_order_acquire)) { +// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; +// break; +// } +// } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 4f8979703f..5d28cf0e90 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -15,16 +15,19 @@ // specific language governing permissions and limitations // under the License. +#include #include "scheduler/TaskCreator.h" #include "scheduler/tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" +#include "SchedInst.h" + namespace zilliz { namespace milvus { namespace scheduler { std::vector -TaskCreator::Create(const JobPtr& job) { +TaskCreator::Create(const JobPtr &job) { switch (job->type()) { case JobType::SEARCH: { return Create(std::static_pointer_cast(job)); @@ -32,6 +35,9 @@ TaskCreator::Create(const JobPtr& job) { case JobType::DELETE: { return Create(std::static_pointer_cast(job)); } + case JobType::BUILD: { + return Create(std::static_pointer_cast(job)); + } default: { // TODO: error return std::vector(); @@ -40,9 +46,9 @@ TaskCreator::Create(const JobPtr& job) { } std::vector -TaskCreator::Create(const SearchJobPtr& job) { +TaskCreator::Create(const SearchJobPtr &job) { std::vector tasks; - for (auto& index_file : job->index_files()) { + for (auto &index_file : job->index_files()) { auto task = std::make_shared(index_file.second); task->label() = std::make_shared(); task->job_ = job; @@ -53,7 +59,7 @@ TaskCreator::Create(const SearchJobPtr& job) { } std::vector -TaskCreator::Create(const DeleteJobPtr& job) { +TaskCreator::Create(const DeleteJobPtr &job) { std::vector tasks; auto task = std::make_shared(job); task->label() = std::make_shared(); @@ -63,6 +69,21 @@ TaskCreator::Create(const DeleteJobPtr& job) { return tasks; } +std::vector +TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) { + std::vector tasks; + //TODO(yukun): remove "disk" hardcode here + ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); + + for (auto &to_index_file : job->to_index_files()) { + auto task = std::make_shared(to_index_file.second); + task->label() = std::make_shared(std::weak_ptr(res_ptr)); + task->job_ = job; + tasks.emplace_back(task); + } + return tasks; +} + } // namespace scheduler } // namespace milvus } // namespace zilliz diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h index 5ae6a0763a..74c2fbaba3 100644 --- a/cpp/src/scheduler/TaskCreator.h +++ b/cpp/src/scheduler/TaskCreator.h @@ -32,6 +32,7 @@ #include "job/SearchJob.h" #include "task/DeleteTask.h" #include "task/SearchTask.h" +#include "task/BuildIndexTask.h" #include "task/Task.h" namespace zilliz { @@ -49,6 +50,9 @@ class TaskCreator { static std::vector Create(const DeleteJobPtr& job); + + static std::vector + Create(const BuildIndexJobPtr& job); }; } // namespace scheduler diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index c7514b1e2d..15fdcbb736 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -20,6 +20,7 @@ #include "../Algorithm.h" #include "Action.h" #include "src/cache/GpuCacheMgr.h" +#include "src/server/Config.h" namespace zilliz { namespace milvus { @@ -142,26 +143,41 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr transport_costs.push_back(transport_cost); paths.emplace_back(path); } - - // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx = 0; - for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->TotalTasks() == 0) { - min_cost_idx = i; - break; + if (task->job_.lock()->type() == JobType::SEARCH) { + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = + compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } } - uint64_t cost = - compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - if (min_cost > cost) { - min_cost = cost; - min_cost_idx = i; + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; + } else if (task->job_.lock()->type() == JobType::BUILD) { + //step2: Read device id in config + //get build index gpu resource + server::Config &config = server::Config::GetInstance(); + int32_t build_index_gpu; + Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); + + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->device_id() == build_index_gpu) { + Path task_path(paths[i], paths[i].size() - 1); + task->path() = task_path; + break; + } } } - - // step 3: set path in task - Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); - task->path() = task_path; } if (resource->name() == task->path().Last()) { diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp new file mode 100644 index 0000000000..4c91afcca0 --- /dev/null +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "BuildIndexJob.h" +#include "utils/Log.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id) + : Job(id, JobType::BUILD){ + +} + +bool +BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file, + const TableFileSchema table_file) { + std::unique_lock lock(mutex_); + if (to_index_file == nullptr) { + return false; + } + + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_; + + to_index_files_[to_index_file->id_] = to_index_file; + table_files_[table_file.id_] = table_file; +} + +Status& +BuildIndexJob::WaitBuildIndexFinish() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return to_index_files_.empty(); }); + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done"; +} + +void +BuildIndexJob::BuildIndexDone(size_t to_index_id) { + std::unique_lock lock(mutex_); + to_index_files_.erase(to_index_id); + cv_.notify_all(); + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id; +} + + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h new file mode 100644 index 0000000000..4d52461348 --- /dev/null +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/Meta.h" +#include "scheduler/Definition.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using engine::meta::TableFileSchemaPtr; + +using Id2ToIndexMap = std::unordered_map; +using Id2ToTableFileMap = std::unordered_map; + +class BuildIndexJob : public Job { + public: + explicit BuildIndexJob(JobId id); + + public: + bool + AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file); + + Status & + WaitBuildIndexFinish(); + + void + BuildIndexDone(size_t to_index_id); + + public: +// std::string +// location() const { +// return location_; +// } +// +// EngineType +// engine_type() const { +// return engine_type_; +// } + + Id2ToIndexMap & + to_index_files() { + return to_index_files_; + } + + private: + Id2ToIndexMap to_index_files_; + Id2ToTableFileMap table_files_; + + std::mutex mutex_; + std::condition_variable cv_; +}; + +using BuildIndexJobPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp new file mode 100644 index 0000000000..d6b063d713 --- /dev/null +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "BuildIndexTask.h" +#include "db/engine/EngineFactory.h" +#include "metrics/Metrics.h" +#include "scheduler/job/BuildIndexJob.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" + +#include +#include +#include + +namespace zilliz { +namespace milvus { +namespace scheduler { + +XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file) + : Task(TaskType::BuildIndexTask), file_(file) { + if (file_) { + to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_, + (MetricType) file_->metric_type_, file_->nlist_); + } +} + +void +XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) { + TimeRecorder rc(""); + Status stat = Status::OK(); + std::string error_msg; + std::string type_str; + + try { + if (type == LoadType::DISK2CPU) { + stat = to_index_engine_->Load(); + type_str = "DISK2CPU"; + } else if (type == LoadType::CPU2GPU) { + stat = to_index_engine_->CopyToGpu(device_id); + type_str = "CPU2GPU"; + } else if (type == LoadType::GPU2CPU) { + stat = to_index_engine_->CopyToCpu(); + type_str = "GPU2CPU"; + } else { + error_msg = "Wrong load type"; + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + } catch (std::exception& ex) { + // typical error: out of disk space or permition denied + error_msg = "Failed to load to_index file: " + std::string(ex.what()); + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (!stat.ok()) { + Status s; + if(stat.ToString().find("out of memory") != std::string::npos) { + error_msg = "out of memory: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } else { + error_msg = "Failed to load to_index file: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + build_index_job->BuildIndexDone(file_->id_); + } + + return; + } + + size_t file_size = to_index_engine_->PhysicalSize(); + + std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + " bytes from location: " + file_->location_ + " totally cost"; + double span = rc.ElapseFromBegin(info); + +// to_index_id_ = file_->id_; +// to_index_type_ = file_->file_type_; +} + +void +XBuildIndexTask::Execute() { + if (to_index_engine_ == nullptr) { + return; + } + + TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_)); + + if (auto job = job_.lock()) { + auto build_job = std::static_pointer_cast(job); + std::string location = file_->location_; + EngineType engine_type = (EngineType)file_->engine_type_; + std::shared_ptr index; + + try { + index = to_index_engine_->BuildIndex(location, engine_type); + if (index == nullptr) { + table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + //TODO: updatetablefile + } + } catch (std::exception &ex) { + ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + } + + build_job->BuildIndexDone(to_index_id_); + } + + rc.ElapseFromBegin("totally cost"); + + to_index_engine_ = nullptr; +} + +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h new file mode 100644 index 0000000000..42606e15a7 --- /dev/null +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "Task.h" +#include "scheduler/Definition.h" +#include "scheduler/job/BuildIndexJob.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +class XBuildIndexTask : public Task { + public: + explicit XBuildIndexTask(TableFileSchemaPtr file); + + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + + public: + TableFileSchemaPtr file_; + TableFileSchema table_file_; + size_t to_index_id_ = 0; + int to_index_type_ = 0; + ExecutionEnginePtr to_index_engine_ = nullptr; +}; + +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index da5fb3b3af..d537a6335d 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -39,6 +39,7 @@ enum class LoadType { enum class TaskType { SearchTask, DeleteTask, + BuildIndexTask, TestTask, }; diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index b5bea81b39..1510e1a79e 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -18,13 +18,14 @@ #pragma once #include "TaskLabel.h" +#include "scheduler/ResourceMgr.h" #include #include -class Resource; - -using ResourceWPtr = std::weak_ptr; +//class Resource; +// +//using ResourceWPtr = std::weak_ptr; namespace zilliz { namespace milvus { From bc10937bcf953eede23d63e5887112903804e5b8 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 30 Sep 2019 19:16:56 +0800 Subject: [PATCH 3/8] buildindex to scheduler run ok Former-commit-id: 5d596c2cdc543d1093992d8a09594a1ca7d5d17a --- cpp/src/db/DBImpl.cpp | 64 ++++------- cpp/src/db/engine/ExecutionEngineImpl.cpp | 2 + cpp/src/scheduler/TaskCreator.cpp | 2 +- .../scheduler/action/PushTaskToNeighbour.cpp | 3 +- cpp/src/scheduler/job/BuildIndexJob.cpp | 12 +- cpp/src/scheduler/job/BuildIndexJob.h | 19 +++- cpp/src/scheduler/task/BuildIndexTask.cpp | 104 ++++++++++++++++-- cpp/src/scheduler/task/BuildIndexTask.h | 5 +- 8 files changed, 141 insertions(+), 70 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ba43f945e8..22904bed2d 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -242,11 +242,7 @@ DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) { } Status -<<<<<<< HEAD -DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) { -======= DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vectors, IDNumbers& vector_ids) { ->>>>>>> upstream/branch-0.5.0 // ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache"; if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); @@ -299,17 +295,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector file_types; -<<<<<<< HEAD - if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) { - file_types = { - (int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE, - }; - } else { - file_types = { - (int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW, - (int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX, - (int) meta::TableFileSchema::TO_INDEX, -======= if (index.engine_type_ == static_cast(EngineType::FAISS_IDMAP)) { file_types = { static_cast(meta::TableFileSchema::NEW), static_cast(meta::TableFileSchema::NEW_MERGE), @@ -321,7 +306,6 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { static_cast(meta::TableFileSchema::NEW_MERGE), static_cast(meta::TableFileSchema::NEW_INDEX), static_cast(meta::TableFileSchema::TO_INDEX), ->>>>>>> upstream/branch-0.5.0 }; } @@ -915,38 +899,36 @@ DBImpl::BackgroundBuildIndex() { Status status; scheduler::BuildIndexJobPtr - job = std::make_shared(0); + job = std::make_shared(0, meta_ptr_); // step 2: put build index task to scheduler - scheduler::JobMgrInst::GetInstance()->Put(job); - for (auto &file : to_index_files) { - std::cout << "get to index file" << std::endl; - meta::TableFileSchema table_file; - table_file.table_id_ = file.table_id_; - table_file.date_ = file.date_; - table_file.file_type_ = - meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path - status = meta_ptr_->CreateTableFile(table_file); - if (!status.ok()) { - ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); - } - - scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); - job->AddToIndexFiles(file_ptr, table_file); - } - job->WaitBuildIndexFinish(); - // for (auto &file : to_index_files) { -// status = BuildIndex(file); -// if (!status.ok()) { +// std::cout << "get to index file" << std::endl; +// +// scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); +// job->AddToIndexFiles(file_ptr); +// +// if (!job->GetStatus().ok()) { +// Status status = job->GetStatus(); // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); // } // -// if (shutting_down_.load(std::memory_order_acquire)) { -// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; -// break; -// } // } +// scheduler::JobMgrInst::GetInstance()->Put(job); +// job->WaitBuildIndexFinish(); + + for (auto &file : to_index_files) { + std::cout << "get to index file" << std::endl; + status = BuildIndex(file); + if (!status.ok()) { + ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); + } + + if (shutting_down_.load(std::memory_order_acquire)) { + ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; + break; + } + } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 52365d0915..f9366e439a 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -133,6 +133,7 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { + std::cout << "load" << std::endl; index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { @@ -161,6 +162,7 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { + std::cout << "copy2gpu" << std::endl; auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 3acb28591b..0a7b3f9cbb 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -69,7 +69,7 @@ TaskCreator::Create(const DeleteJobPtr &job) { } std::vector -TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) { +TaskCreator::Create(const BuildIndexJobPtr &job) { std::vector tasks; //TODO(yukun): remove "disk" hardcode here ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 4ee6570012..127e01232c 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -172,7 +172,8 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->device_id() == build_index_gpu) { + if (compute_resources[i]->name() + == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { Path task_path(paths[i], paths[i].size() - 1); task->path() = task_path; break; diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index 4c91afcca0..9ab3650dba 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -19,27 +19,24 @@ #include "utils/Log.h" -namespace zilliz { namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id) - : Job(id, JobType::BUILD){ +BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr) + : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) { } bool -BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file, - const TableFileSchema table_file) { +BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) { std::unique_lock lock(mutex_); - if (to_index_file == nullptr) { + if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) { return false; } SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_; to_index_files_[to_index_file->id_] = to_index_file; - table_files_[table_file.id_] = table_file; } Status& @@ -58,6 +55,5 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) { } -} } } \ No newline at end of file diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index 4d52461348..e536012985 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -32,7 +32,6 @@ #include "scheduler/Definition.h" -namespace zilliz { namespace milvus { namespace scheduler { @@ -43,11 +42,11 @@ using Id2ToTableFileMap = std::unordered_map; class BuildIndexJob : public Job { public: - explicit BuildIndexJob(JobId id); + explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr); public: bool - AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file); + AddToIndexFiles(const TableFileSchemaPtr &to_index_file); Status & WaitBuildIndexFinish(); @@ -66,15 +65,26 @@ class BuildIndexJob : public Job { // return engine_type_; // } + Status & + GetStatus() { + return status_; + } + Id2ToIndexMap & to_index_files() { return to_index_files_; } + engine::meta::MetaPtr + meta() const { + return meta_ptr_; + } + private: Id2ToIndexMap to_index_files_; - Id2ToTableFileMap table_files_; + engine::meta::MetaPtr meta_ptr_; + Status status_; std::mutex mutex_; std::condition_variable cv_; }; @@ -83,4 +93,3 @@ using BuildIndexJobPtr = std::shared_ptr; } } -} \ No newline at end of file diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index d6b063d713..ea775982ee 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -26,7 +26,6 @@ #include #include -namespace zilliz { namespace milvus { namespace scheduler { @@ -39,7 +38,7 @@ XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file) } void -XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) { +XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { TimeRecorder rc(""); Status stat = Status::OK(); std::string error_msg; @@ -50,7 +49,7 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i stat = to_index_engine_->Load(); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - stat = to_index_engine_->CopyToGpu(device_id); +// stat = to_index_engine_->CopyToGpu(device_id); type_str = "CPU2GPU"; } else if (type == LoadType::GPU2CPU) { stat = to_index_engine_->CopyToCpu(); @@ -90,8 +89,8 @@ XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_i " bytes from location: " + file_->location_ + " totally cost"; double span = rc.ElapseFromBegin(info); -// to_index_id_ = file_->id_; -// to_index_type_ = file_->file_type_; + to_index_id_ = file_->id_; + to_index_type_ = file_->file_type_; } void @@ -103,22 +102,106 @@ XBuildIndexTask::Execute() { TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_)); if (auto job = job_.lock()) { - auto build_job = std::static_pointer_cast(job); + auto build_index_job = std::static_pointer_cast(job); std::string location = file_->location_; EngineType engine_type = (EngineType)file_->engine_type_; std::shared_ptr index; + // step 2: create table file + engine::meta::TableFileSchema table_file; + table_file.table_id_ = file_->table_id_; + table_file.date_ = file_->date_; + table_file.file_type_ = + engine::meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path + + engine::meta::MetaPtr meta_ptr = build_index_job->meta(); + Status status = build_index_job->meta()->CreateTableFile(table_file); + if (!status.ok()) { + ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); + build_index_job->BuildIndexDone(to_index_id_); + //TODO: return status + } + + // step 3: build index try { index = to_index_engine_->BuildIndex(location, engine_type); if (index == nullptr) { - table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE; - //TODO: updatetablefile + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ + << " to to_delete"; + + return; } } catch (std::exception &ex) { - ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" + << std::endl; + + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; } - build_job->BuildIndexDone(to_index_id_); + // step 4: if table has been deleted, dont save index file + bool has_table = false; + meta_ptr->HasTable(file_->table_id_, has_table); + if (!has_table) { + meta_ptr->DeleteTableFiles(file_->table_id_); +// return Status::OK(); + } + + // step 5: save index file + try { + index->Serialize(); + } catch (std::exception &ex) { + // typical error: out of disk space or permition denied + std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to persist index file: " << table_file.location_ + << ", possible out of disk space" << std::endl; + +// return Status(DB_ERROR, msg); + } + + // step 6: update meta + table_file.file_type_ = engine::meta::TableFileSchema::INDEX; + table_file.file_size_ = index->PhysicalSize(); + table_file.row_count_ = index->Count(); + + auto origin_file = *file_; + origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP; + + engine::meta::TableFilesSchema update_files = {table_file, origin_file}; + status = meta_ptr->UpdateTableFiles(update_files); + if (status.ok()) { + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize() + << " bytes" + << " from file " << origin_file.file_id_; + +// index->Cache(); + } else { + // failed to update meta, mark the new file as to_delete, don't delete old file + origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX; + status = meta_ptr->UpdateTableFile(origin_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index"; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ << " to to_delete"; + } + + build_index_job->BuildIndexDone(to_index_id_); } rc.ElapseFromBegin("totally cost"); @@ -128,4 +211,3 @@ XBuildIndexTask::Execute() { } // namespace scheduler } // namespace milvus -} // namespace zilliz diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index 42606e15a7..cd751270a0 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -22,7 +22,6 @@ #include "scheduler/job/BuildIndexJob.h" -namespace zilliz { namespace milvus { namespace scheduler { @@ -42,8 +41,8 @@ class XBuildIndexTask : public Task { size_t to_index_id_ = 0; int to_index_type_ = 0; ExecutionEnginePtr to_index_engine_ = nullptr; + }; } // namespace scheduler -} // namespace milvus -} // namespace zilliz +} // namespace milvus \ No newline at end of file From bdbc1375d799e6154a206d27161ec273231941c9 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Tue, 8 Oct 2019 16:05:57 +0800 Subject: [PATCH 4/8] MS-603 Add BuildIndex to scheduler Former-commit-id: f962874aac6dd92b4cdf121ca8a645185bc5882f --- cpp/src/db/DBImpl.cpp | 14 ++- cpp/src/db/engine/ExecutionEngine.h | 3 + cpp/src/db/engine/ExecutionEngineImpl.cpp | 14 ++- cpp/src/db/engine/ExecutionEngineImpl.h | 3 + .../scheduler/action/PushTaskToNeighbour.cpp | 38 +++++--- cpp/src/scheduler/job/BuildIndexJob.cpp | 4 +- cpp/src/scheduler/job/BuildIndexJob.h | 18 ++-- cpp/src/scheduler/task/BuildIndexTask.cpp | 94 ++++++++++--------- 8 files changed, 106 insertions(+), 82 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 22904bed2d..1847e30017 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -898,24 +898,22 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; - scheduler::BuildIndexJobPtr - job = std::make_shared(0, meta_ptr_); - // step 2: put build index task to scheduler // for (auto &file : to_index_files) { -// std::cout << "get to index file" << std::endl; +// scheduler::BuildIndexJobPtr +// job = std::make_shared(0, meta_ptr_, options_); // // scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); -// job->AddToIndexFiles(file_ptr); // +// job->AddToIndexFiles(file_ptr); +// scheduler::JobMgrInst::GetInstance()->Put(job); +// job->WaitBuildIndexFinish(); // if (!job->GetStatus().ok()) { // Status status = job->GetStatus(); // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); // } -// // } -// scheduler::JobMgrInst::GetInstance()->Put(job); -// job->WaitBuildIndexFinish(); + for (auto &file : to_index_files) { std::cout << "get to index file" << std::endl; diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index 1572b967d1..848704bd4b 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -66,6 +66,9 @@ class ExecutionEngine { virtual Status CopyToGpu(uint64_t device_id) = 0; + virtual Status + CopyToIndexFileToGpu(uint64_t device_id) = 0; + virtual Status CopyToCpu() = 0; diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index f9366e439a..a6f82d21c1 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -133,7 +133,6 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { - std::cout << "load" << std::endl; index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { @@ -162,7 +161,7 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { - std::cout << "copy2gpu" << std::endl; + std::cout << "copytogpu" << std::endl; auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { @@ -189,6 +188,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { return Status::OK(); } +Status +ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { + auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); + bool already_in_cache = (index != nullptr); + if (!already_in_cache) { + cache::DataObjPtr obj = std::make_shared(nullptr, PhysicalSize()); + milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj); + } + return Status::OK(); +} + Status ExecutionEngineImpl::CopyToCpu() { auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 62a7b29a63..5793763fdd 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine { Status CopyToGpu(uint64_t device_id) override; + Status + CopyToIndexFileToGpu(uint64_t device_id) override; + Status CopyToCpu() override; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 127e01232c..828f0c71c6 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -22,13 +22,14 @@ #include "src/cache/GpuCacheMgr.h" #include "src/server/Config.h" + namespace milvus { namespace scheduler { std::vector -get_neighbours(const ResourcePtr& self) { +get_neighbours(const ResourcePtr &self) { std::vector neighbours; - for (auto& neighbour_node : self->GetNeighbours()) { + for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -42,9 +43,9 @@ get_neighbours(const ResourcePtr& self) { } std::vector> -get_neighbours_with_connetion(const ResourcePtr& self) { +get_neighbours_with_connetion(const ResourcePtr &self) { std::vector> neighbours; - for (auto& neighbour_node : self->GetNeighbours()) { + for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -58,12 +59,12 @@ get_neighbours_with_connetion(const ResourcePtr& self) { } void -Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { std::vector speeds; uint64_t total_speed = 0; - for (auto& neighbour : neighbours) { + for (auto &neighbour : neighbours) { uint64_t speed = neighbour.second.speed(); speeds.emplace_back(speed); total_speed += speed; @@ -88,15 +89,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self } void -Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { auto neighbours = get_neighbours(self); - for (auto& neighbour : neighbours) { + for (auto &neighbour : neighbours) { neighbour->task_table().Put(task); } } void -Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { +Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) { dest->task_table().Put(task); } @@ -138,7 +139,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr auto compute_resources = res_mgr.lock()->GetComputeResources(); std::vector> paths; std::vector transport_costs; - for (auto& res : compute_resources) { + for (auto &res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); transport_costs.push_back(transport_cost); @@ -171,14 +172,21 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr int32_t build_index_gpu; Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); + bool find_gpu_res = false; for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->name() - == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { - Path task_path(paths[i], paths[i].size() - 1); - task->path() = task_path; - break; + if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { + if (compute_resources[i]->name() + == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + find_gpu_res = true; + Path task_path(paths[i], paths[i].size() - 1); + task->path() = task_path; + break; + } } } + if (not find_gpu_res) { + task->path() = Path(paths[0], paths[0].size() - 1); + } } } diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index 9ab3650dba..9dc21f9bec 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -22,8 +22,8 @@ namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr) - : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)) { +BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options) + : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { } diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index e536012985..e4fa7b1fd8 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -42,7 +42,7 @@ using Id2ToTableFileMap = std::unordered_map; class BuildIndexJob : public Job { public: - explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr); + explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options); public: bool @@ -55,16 +55,6 @@ class BuildIndexJob : public Job { BuildIndexDone(size_t to_index_id); public: -// std::string -// location() const { -// return location_; -// } -// -// EngineType -// engine_type() const { -// return engine_type_; -// } - Status & GetStatus() { return status_; @@ -80,9 +70,15 @@ class BuildIndexJob : public Job { return meta_ptr_; } + engine::DBOptions + options() const { + return options_; + } + private: Id2ToIndexMap to_index_files_; engine::meta::MetaPtr meta_ptr_; + engine::DBOptions options_; Status status_; std::mutex mutex_; diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index ea775982ee..667db2cbae 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -44,53 +44,57 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { std::string error_msg; std::string type_str; - try { - if (type == LoadType::DISK2CPU) { - stat = to_index_engine_->Load(); - type_str = "DISK2CPU"; - } else if (type == LoadType::CPU2GPU) { -// stat = to_index_engine_->CopyToGpu(device_id); - type_str = "CPU2GPU"; - } else if (type == LoadType::GPU2CPU) { - stat = to_index_engine_->CopyToCpu(); - type_str = "GPU2CPU"; - } else { - error_msg = "Wrong load type"; + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + auto options = build_index_job->options(); + try { + if (type == LoadType::DISK2CPU) { + stat = to_index_engine_->Load(options.insert_cache_immediately_); + type_str = "DISK2CPU"; + } else if (type == LoadType::CPU2GPU) { + stat = to_index_engine_->CopyToIndexFileToGpu(device_id); + type_str = "CPU2GPU"; + } else if (type == LoadType::GPU2CPU) { + stat = to_index_engine_->CopyToCpu(); + type_str = "GPU2CPU"; + } else { + error_msg = "Wrong load type"; + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + } catch (std::exception& ex) { + // typical error: out of disk space or permition denied + error_msg = "Failed to load to_index file: " + std::string(ex.what()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - } catch (std::exception& ex) { - // typical error: out of disk space or permition denied - error_msg = "Failed to load to_index file: " + std::string(ex.what()); - stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); - } - if (!stat.ok()) { - Status s; - if(stat.ToString().find("out of memory") != std::string::npos) { - error_msg = "out of memory: " + type_str; - s = Status(SERVER_UNEXPECTED_ERROR, error_msg); - } else { - error_msg = "Failed to load to_index file: " + type_str; - s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + if (!stat.ok()) { + Status s; + if(stat.ToString().find("out of memory") != std::string::npos) { + error_msg = "out of memory: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } else { + error_msg = "Failed to load to_index file: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + build_index_job->BuildIndexDone(file_->id_); + } + + return; } - if (auto job = job_.lock()) { - auto build_index_job = std::static_pointer_cast(job); - build_index_job->BuildIndexDone(file_->id_); - } + size_t file_size = to_index_engine_->PhysicalSize(); - return; + std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + " bytes from location: " + file_->location_ + " totally cost"; + double span = rc.ElapseFromBegin(info); + + to_index_id_ = file_->id_; + to_index_type_ = file_->file_type_; } - - size_t file_size = to_index_engine_->PhysicalSize(); - - std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + - std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + - " bytes from location: " + file_->location_ + " totally cost"; - double span = rc.ElapseFromBegin(info); - - to_index_id_ = file_->id_; - to_index_type_ = file_->file_type_; } void @@ -119,12 +123,13 @@ XBuildIndexTask::Execute() { if (!status.ok()) { ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); build_index_job->BuildIndexDone(to_index_id_); - //TODO: return status + build_index_job->GetStatus() = status; + return; } // step 3: build index try { - index = to_index_engine_->BuildIndex(location, engine_type); + index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); if (index == nullptr) { table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; status = meta_ptr->UpdateTableFile(table_file); @@ -153,7 +158,7 @@ XBuildIndexTask::Execute() { meta_ptr->HasTable(file_->table_id_, has_table); if (!has_table) { meta_ptr->DeleteTableFiles(file_->table_id_); -// return Status::OK(); + return; } // step 5: save index file @@ -171,7 +176,8 @@ XBuildIndexTask::Execute() { std::cout << "ERROR: failed to persist index file: " << table_file.location_ << ", possible out of disk space" << std::endl; -// return Status(DB_ERROR, msg); + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; } // step 6: update meta From 0af6d85ea965a9a1c03bbb1f36a30cb9bb530811 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Tue, 8 Oct 2019 17:26:25 +0800 Subject: [PATCH 5/8] fix conflict Former-commit-id: afdfa1f34302637762b06fa6ccb4058dbb391922 --- cpp/src/db/DBImpl.cpp | 48 +++++++++++------------ cpp/src/scheduler/TaskCreator.cpp | 4 +- cpp/src/scheduler/task/BuildIndexTask.cpp | 4 +- cpp/src/scheduler/task/BuildIndexTask.h | 2 +- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 1847e30017..e2bad26deb 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -899,35 +899,35 @@ DBImpl::BackgroundBuildIndex() { Status status; // step 2: put build index task to scheduler -// for (auto &file : to_index_files) { -// scheduler::BuildIndexJobPtr -// job = std::make_shared(0, meta_ptr_, options_); -// -// scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); -// -// job->AddToIndexFiles(file_ptr); -// scheduler::JobMgrInst::GetInstance()->Put(job); -// job->WaitBuildIndexFinish(); -// if (!job->GetStatus().ok()) { -// Status status = job->GetStatus(); -// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); -// } -// } - - for (auto &file : to_index_files) { - std::cout << "get to index file" << std::endl; - status = BuildIndex(file); - if (!status.ok()) { + scheduler::BuildIndexJobPtr + job = std::make_shared(0, meta_ptr_, options_); + + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + + job->AddToIndexFiles(file_ptr); + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitBuildIndexFinish(); + if (!job->GetStatus().ok()) { + Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); } - - if (shutting_down_.load(std::memory_order_acquire)) { - ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; - break; - } } + +// for (auto &file : to_index_files) { +// std::cout << "get to index file" << std::endl; +// status = BuildIndex(file); +// if (!status.ok()) { +// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); +// } +// +// if (shutting_down_.load(std::memory_order_acquire)) { +// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; +// break; +// } +// } + ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 83d112918c..2645a46df6 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -75,8 +75,8 @@ TaskCreator::Create(const BuildIndexJobPtr &job) { ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); for (auto &to_index_file : job->to_index_files()) { - auto task = std::make_shared(to_index_file.second); - task->label() = std::make_shared(std::weak_ptr(res_ptr)); + auto label = std::make_shared(std::weak_ptr(res_ptr)); + auto task = std::make_shared(to_index_file.second, label); task->job_ = job; tasks.emplace_back(task); } diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index 667db2cbae..2794c1b45d 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -29,8 +29,8 @@ namespace milvus { namespace scheduler { -XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file) - : Task(TaskType::BuildIndexTask), file_(file) { +XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label) + : Task(TaskType::BuildIndexTask, std::move(label)), file_(file) { if (file_) { to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_, (MetricType) file_->metric_type_, file_->nlist_); diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index cd751270a0..770295a042 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -27,7 +27,7 @@ namespace scheduler { class XBuildIndexTask : public Task { public: - explicit XBuildIndexTask(TableFileSchemaPtr file); + explicit XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label); void Load(LoadType type, uint8_t device_id) override; From 1bfe39e07437bb58b16cc1bc4d489f3ea86a326f Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Wed, 9 Oct 2019 11:18:58 +0800 Subject: [PATCH 6/8] MS-603 Add BuildIndex to scheduler Former-commit-id: f4507e6570ff203b7f85878eac6bba514a0f5818 --- cpp/src/db/DBImpl.cpp | 46 +++++++++++++-------------- cpp/src/metrics/PrometheusMetrics.cpp | 2 +- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index e2bad26deb..747a61ddbe 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -898,36 +898,34 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; +// scheduler::BuildIndexJobPtr +// job = std::make_shared(0, meta_ptr_, options_); + // step 2: put build index task to scheduler +// for (auto &file : to_index_files) { +// scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); +// job->AddToIndexFiles(file_ptr); +// } +// scheduler::JobMgrInst::GetInstance()->Put(job); +// job->WaitBuildIndexFinish(); +// if (!job->GetStatus().ok()) { +// Status status = job->GetStatus(); +// ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); +// } + for (auto &file : to_index_files) { - scheduler::BuildIndexJobPtr - job = std::make_shared(0, meta_ptr_, options_); - - scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); - - job->AddToIndexFiles(file_ptr); - scheduler::JobMgrInst::GetInstance()->Put(job); - job->WaitBuildIndexFinish(); - if (!job->GetStatus().ok()) { - Status status = job->GetStatus(); + std::cout << "get to index file" << std::endl; + status = BuildIndex(file); + if (!status.ok()) { ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); } + + if (shutting_down_.load(std::memory_order_acquire)) { + ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; + break; + } } - -// for (auto &file : to_index_files) { -// std::cout << "get to index file" << std::endl; -// status = BuildIndex(file); -// if (!status.ok()) { -// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); -// } -// -// if (shutting_down_.load(std::memory_order_acquire)) { -// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; -// break; -// } -// } - ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/metrics/PrometheusMetrics.cpp b/cpp/src/metrics/PrometheusMetrics.cpp index bc1860389f..182f14d46c 100644 --- a/cpp/src/metrics/PrometheusMetrics.cpp +++ b/cpp/src/metrics/PrometheusMetrics.cpp @@ -46,7 +46,7 @@ PrometheusMetrics::Init() { return s.code(); } - const std::string uri = std::string("/tmp/metrics"); + const std::string uri = std::string("/metrics"); const std::size_t num_threads = 2; // Init Exposer From 5df210f924d9a17f676395f9a4aa5eec23bdf756 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Wed, 9 Oct 2019 17:17:25 +0800 Subject: [PATCH 7/8] code format Former-commit-id: c97b162cf28301a5b1838377bdb71d4ad14daca2 --- cpp/src/db/DBImpl.cpp | 45 +++++++++++------------ cpp/src/db/engine/ExecutionEngineImpl.h | 2 +- cpp/src/scheduler/job/BuildIndexJob.cpp | 9 ++--- cpp/src/scheduler/job/BuildIndexJob.h | 4 +- cpp/src/scheduler/task/BuildIndexTask.cpp | 12 +++--- cpp/src/scheduler/task/BuildIndexTask.h | 1 - cpp/unittest/metrics/utils.cpp | 1 + 7 files changed, 36 insertions(+), 38 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 747a61ddbe..f5560a5476 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -898,33 +898,32 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; -// scheduler::BuildIndexJobPtr -// job = std::make_shared(0, meta_ptr_, options_); + scheduler::BuildIndexJobPtr + job = std::make_shared(0, meta_ptr_, options_); // step 2: put build index task to scheduler -// for (auto &file : to_index_files) { -// scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); -// job->AddToIndexFiles(file_ptr); -// } -// scheduler::JobMgrInst::GetInstance()->Put(job); -// job->WaitBuildIndexFinish(); -// if (!job->GetStatus().ok()) { -// Status status = job->GetStatus(); -// ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); -// } - for (auto &file : to_index_files) { - std::cout << "get to index file" << std::endl; - status = BuildIndex(file); - if (!status.ok()) { - ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); - } - - if (shutting_down_.load(std::memory_order_acquire)) { - ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; - break; - } + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + job->AddToIndexFiles(file_ptr); } + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitBuildIndexFinish(); + if (!job->GetStatus().ok()) { + Status status = job->GetStatus(); + ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); + } + +// for (auto &file : to_index_files) { +// status = BuildIndex(file); +// if (!status.ok()) { +// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); +// } +// +// if (shutting_down_.load(std::memory_order_acquire)) { +// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; +// break; +// } +// } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 5793763fdd..56a5849994 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -59,7 +59,7 @@ class ExecutionEngineImpl : public ExecutionEngine { CopyToGpu(uint64_t device_id) override; Status - CopyToIndexFileToGpu(uint64_t device_id) override; + CopyToIndexFileToGpu(uint64_t device_id) override; Status CopyToCpu() override; diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index 9dc21f9bec..f3efe294b2 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -15,16 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include "BuildIndexJob.h" #include "utils/Log.h" - +#include "BuildIndexJob.h" namespace milvus { namespace scheduler { BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options) : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { - } bool @@ -54,6 +52,5 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) { SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id; } - -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index e4fa7b1fd8..cc50d70fc9 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -87,5 +87,5 @@ class BuildIndexJob : public Job { using BuildIndexJobPtr = std::shared_ptr; -} -} +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index 2794c1b45d..2332e652b7 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -26,6 +26,7 @@ #include #include + namespace milvus { namespace scheduler { @@ -61,7 +62,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { error_msg = "Wrong load type"; stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - } catch (std::exception& ex) { + } catch (std::exception &ex) { // typical error: out of disk space or permition denied error_msg = "Failed to load to_index file: " + std::string(ex.what()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); @@ -69,7 +70,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { if (!stat.ok()) { Status s; - if(stat.ToString().find("out of memory") != std::string::npos) { + if (stat.ToString().find("out of memory") != std::string::npos) { error_msg = "out of memory: " + type_str; s = Status(SERVER_UNEXPECTED_ERROR, error_msg); } else { @@ -108,7 +109,7 @@ XBuildIndexTask::Execute() { if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); std::string location = file_->location_; - EngineType engine_type = (EngineType)file_->engine_type_; + EngineType engine_type = (EngineType) file_->engine_type_; std::shared_ptr index; // step 2: create table file @@ -116,7 +117,7 @@ XBuildIndexTask::Execute() { table_file.table_id_ = file_->table_id_; table_file.date_ = file_->date_; table_file.file_type_ = - engine::meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path + engine::meta::TableFileSchema::NEW_INDEX; engine::meta::MetaPtr meta_ptr = build_index_job->meta(); Status status = build_index_job->meta()->CreateTableFile(table_file); @@ -204,7 +205,8 @@ XBuildIndexTask::Execute() { table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; status = meta_ptr->UpdateTableFile(table_file); - ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ << " to to_delete"; + ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ + << " to to_delete"; } build_index_job->BuildIndexDone(to_index_id_); diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index 770295a042..a601f760ae 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -41,7 +41,6 @@ class XBuildIndexTask : public Task { size_t to_index_id_ = 0; int to_index_type_ = 0; ExecutionEnginePtr to_index_engine_ = nullptr; - }; } // namespace scheduler diff --git a/cpp/unittest/metrics/utils.cpp b/cpp/unittest/metrics/utils.cpp index c2a53babc3..e345923b7b 100644 --- a/cpp/unittest/metrics/utils.cpp +++ b/cpp/unittest/metrics/utils.cpp @@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() { } void MetricTest::SetUp() { + boost::filesystem::remove_all("/tmp/milvus_test"); InitLog(); auto options = GetOptions(); db_ = ms::engine::DBFactory::Build(options); From f8b9091f41a8bad68d667290ec57681189169f51 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Thu, 10 Oct 2019 10:15:00 +0800 Subject: [PATCH 8/8] clang tidy Former-commit-id: 1d226a67672476b82957e27e60839283259d7159 --- cpp/src/db/engine/ExecutionEngineImpl.cpp | 1 - cpp/src/scheduler/job/BuildIndexJob.cpp | 2 +- cpp/src/scheduler/task/BuildIndexTask.cpp | 1 + cpp/src/scheduler/task/BuildIndexTask.h | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index a6f82d21c1..51a31d1143 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -161,7 +161,6 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { - std::cout << "copytogpu" << std::endl; auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index f3efe294b2..bccc7a9327 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#include "utils/Log.h" #include "BuildIndexJob.h" +#include "utils/Log.h" namespace milvus { namespace scheduler { diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index 2332e652b7..b37251092e 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. + #include "BuildIndexTask.h" #include "db/engine/EngineFactory.h" #include "metrics/Metrics.h" diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index a601f760ae..84825921f9 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -44,4 +44,4 @@ class XBuildIndexTask : public Task { }; } // namespace scheduler -} // namespace milvus \ No newline at end of file +} // namespace milvus