From 59f45763d33bae07cccc9ac946caac9dfa647cb6 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 11 Oct 2019 17:38:48 +0800 Subject: [PATCH] MS-636 Add optimizer in scheduler for FAISS_IVFSQ8H Former-commit-id: 4dff6236c98a86965e686b4a94e5492e0c1d5722 --- cpp/CHANGELOG.md | 1 + cpp/src/db/DBImpl.cpp | 25 +++++------ cpp/src/scheduler/Algorithm.cpp | 2 - cpp/src/scheduler/JobMgr.cpp | 7 ++- cpp/src/scheduler/SchedInst.cpp | 3 ++ cpp/src/scheduler/SchedInst.h | 24 +++++++++++ cpp/src/scheduler/TaskCreator.cpp | 4 +- .../scheduler/action/PushTaskToNeighbour.cpp | 43 ++++++++++++------- cpp/src/scheduler/optimizer/HybridPass.cpp | 7 ++- cpp/src/scheduler/optimizer/Optimizer.h | 6 ++- cpp/src/scheduler/task/BuildIndexTask.cpp | 5 +++ cpp/unittest/CMakeLists.txt | 2 + cpp/unittest/db/CMakeLists.txt | 9 ++++ cpp/unittest/db/appendix/log_config.conf | 27 ++++++++++++ cpp/unittest/db/appendix/server_config.yaml | 37 ++++++++++++++++ cpp/unittest/db/test_db.cpp | 26 +++++++++++ cpp/unittest/db/utils.cpp | 2 +- 17 files changed, 193 insertions(+), 37 deletions(-) create mode 100644 cpp/unittest/db/appendix/log_config.conf create mode 100644 cpp/unittest/db/appendix/server_config.yaml diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1a62a791b0..d0763d5c06 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature - MS-627 - Integrate new index: IVFSQHybrid - MS-631 - IVFSQ8H Index support +- MS-636 - Add optimizer in scheduler for FAISS_IVFSQ8H ## Task - MS-554 - Change license to Apache 2.0 diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index f81fb32e7f..cf8f1824d2 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -900,20 +900,21 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; - scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); + if (!to_index_files.empty()) { + scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); - // step 2: put build index task to scheduler - for (auto& file : to_index_files) { - scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); - job->AddToIndexFiles(file_ptr); + // step 2: put build index task to scheduler + for (auto& file : to_index_files) { + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + job->AddToIndexFiles(file_ptr); + } + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitBuildIndexFinish(); + if (!job->GetStatus().ok()) { + Status status = job->GetStatus(); + ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); + } } - scheduler::JobMgrInst::GetInstance()->Put(job); - job->WaitBuildIndexFinish(); - if (!job->GetStatus().ok()) { - Status status = job->GetStatus(); - ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); - } - // for (auto &file : to_index_files) { // status = BuildIndex(file); // if (!status.ok()) { diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index 44f83742c2..b2156b3f97 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -29,8 +29,6 @@ constexpr uint64_t MAXINT = std::numeric_limits::max(); uint64_t ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrPtr& res_mgr, std::vector& path) { - std::vector> paths; - uint64_t num_of_resources = res_mgr->GetAllResources().size(); std::unordered_map id_name_map; std::unordered_map name_id_map; diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp index a4ef83ad75..170dee4b80 100644 --- a/cpp/src/scheduler/JobMgr.cpp +++ b/cpp/src/scheduler/JobMgr.cpp @@ -16,7 +16,9 @@ // under the License. #include "scheduler/JobMgr.h" +#include "SchedInst.h" #include "TaskCreator.h" +#include "optimizer/Optimizer.h" #include "task/Task.h" #include @@ -67,8 +69,9 @@ JobMgr::worker_function() { } auto tasks = build_task(job); - - // TODO: optimizer all task + for (auto& task : tasks) { + OptimizerInst::GetInstance()->Run(task); + } // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index b9edbca001..cc2b4e280a 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -38,6 +38,9 @@ std::mutex SchedInst::mutex_; scheduler::JobMgrPtr JobMgrInst::instance = nullptr; std::mutex JobMgrInst::mutex_; +OptimizerPtr OptimizerInst::instance = nullptr; +std::mutex OptimizerInst::mutex_; + void load_simple_config() { server::Config& config = server::Config::GetInstance(); diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index dc8e4ed478..0d2a04b02c 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -20,9 +20,12 @@ #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" +#include "optimizer/HybridPass.h" +#include "optimizer/Optimizer.h" #include #include +#include namespace milvus { namespace scheduler { @@ -81,6 +84,27 @@ class JobMgrInst { static std::mutex mutex_; }; +class OptimizerInst { + public: + static OptimizerPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + HybridPassPtr pass_ptr = std::make_shared(); + std::vector pass_list; + pass_list.push_back(pass_ptr); + instance = std::make_shared(pass_list); + } + } + return instance; + } + + private: + static scheduler::OptimizerPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index ee63c2c6b7..40cfa9aac6 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -16,10 +16,10 @@ // under the License. #include "scheduler/TaskCreator.h" -#include #include "SchedInst.h" -#include "scheduler/tasklabel/BroadcastLabel.h" +#include "tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" +#include "tasklabel/SpecResLabel.h" namespace milvus { namespace scheduler { diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 53dd45faca..95f8212297 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -19,6 +19,7 @@ #include #include "../Algorithm.h" #include "Action.h" +#include "scheduler/tasklabel/SpecResLabel.h" #include "src/cache/GpuCacheMgr.h" #include "src/server/Config.h" @@ -145,25 +146,35 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr paths.emplace_back(path); } if (task->job_.lock()->type() == JobType::SEARCH) { - // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx = 0; - for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->TotalTasks() == 0) { - min_cost_idx = i; - break; - } - uint64_t cost = - compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - if (min_cost > cost) { - min_cost = cost; - min_cost_idx = i; + auto label = task->label(); + auto spec_label = std::static_pointer_cast(label); + if (spec_label->resource().lock()->type() == ResourceType::CPU) { + std::vector spec_path; + spec_path.push_back(spec_label->resource().lock()->name()); + spec_path.push_back(resource->name()); + task->path() = Path(spec_path, spec_path.size() - 1); + } else { + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } } + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; } - // step 3: set path in task - Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); - task->path() = task_path; } else if (task->job_.lock()->type() == JobType::BUILD) { // step2: Read device id in config // get build index gpu resource diff --git a/cpp/src/scheduler/optimizer/HybridPass.cpp b/cpp/src/scheduler/optimizer/HybridPass.cpp index 343d6ec81e..d63fc2e819 100644 --- a/cpp/src/scheduler/optimizer/HybridPass.cpp +++ b/cpp/src/scheduler/optimizer/HybridPass.cpp @@ -16,7 +16,9 @@ // under the License. #include "scheduler/optimizer/HybridPass.h" +#include "scheduler/SchedInst.h" #include "scheduler/task/SearchTask.h" +#include "scheduler/tasklabel/SpecResLabel.h" namespace milvus { namespace scheduler { @@ -28,7 +30,10 @@ HybridPass::Run(const TaskPtr& task) { return false; auto search_task = std::static_pointer_cast(task); if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) { - // TODO: make specified label + // TODO: remove "cpu" hardcode + ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu"); + auto label = std::make_shared(std::weak_ptr(res_ptr)); + task->label() = label; return true; } return false; diff --git a/cpp/src/scheduler/optimizer/Optimizer.h b/cpp/src/scheduler/optimizer/Optimizer.h index 99282e66a6..68b519e115 100644 --- a/cpp/src/scheduler/optimizer/Optimizer.h +++ b/cpp/src/scheduler/optimizer/Optimizer.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "Pass.h" @@ -34,7 +35,8 @@ namespace scheduler { class Optimizer { public: - Optimizer() = default; + explicit Optimizer(std::vector pass_list) : pass_list_(std::move(pass_list)) { + } void Init(); @@ -46,5 +48,7 @@ class Optimizer { std::vector pass_list_; }; +using OptimizerPtr = std::shared_ptr; + } // namespace scheduler } // namespace milvus diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index f2cebcac9e..25d3d73a7b 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -124,6 +124,7 @@ XBuildIndexTask::Execute() { ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); build_index_job->BuildIndexDone(to_index_id_); build_index_job->GetStatus() = status; + to_index_engine_ = nullptr; return; } @@ -136,6 +137,7 @@ XBuildIndexTask::Execute() { ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + to_index_engine_ = nullptr; return; } } catch (std::exception& ex) { @@ -150,6 +152,7 @@ XBuildIndexTask::Execute() { << std::endl; build_index_job->GetStatus() = Status(DB_ERROR, msg); + to_index_engine_ = nullptr; return; } @@ -158,6 +161,7 @@ XBuildIndexTask::Execute() { meta_ptr->HasTable(file_->table_id_, has_table); if (!has_table) { meta_ptr->DeleteTableFiles(file_->table_id_); + to_index_engine_ = nullptr; return; } @@ -177,6 +181,7 @@ XBuildIndexTask::Execute() { << ", possible out of disk space" << std::endl; build_index_job->GetStatus() = Status(DB_ERROR, msg); + to_index_engine_ = nullptr; return; } diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index 6c9aeadcd1..ac4fae85bb 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -50,6 +50,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files) set(scheduler_files ${scheduler_main_files} ${scheduler_action_files} @@ -57,6 +58,7 @@ set(scheduler_files ${scheduler_job_files} ${scheduler_resource_files} ${scheduler_task_files} + ${scheduler_optimizer_files} ) aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files) diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 2cbf55a208..4bce9f35b3 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -31,3 +31,12 @@ target_link_libraries(test_db install(TARGETS test_db DESTINATION unittest) +configure_file(appendix/server_config.yaml + "${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/server_config.yaml" + COPYONLY) + +configure_file(appendix/log_config.conf + "${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/log_config.conf" + COPYONLY) + + diff --git a/cpp/unittest/db/appendix/log_config.conf b/cpp/unittest/db/appendix/log_config.conf new file mode 100644 index 0000000000..0a3e0d21af --- /dev/null +++ b/cpp/unittest/db/appendix/log_config.conf @@ -0,0 +1,27 @@ +* GLOBAL: + FORMAT = "%datetime | %level | %logger | %msg" + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log" + ENABLED = true + TO_FILE = true + TO_STANDARD_OUTPUT = false + SUBSECOND_PRECISION = 3 + PERFORMANCE_TRACKING = false + MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB +* DEBUG: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log" + ENABLED = true +* WARNING: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log" +* TRACE: + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log" +* VERBOSE: + FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg" + TO_FILE = false + TO_STANDARD_OUTPUT = false +## Error logs +* ERROR: + ENABLED = true + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log" +* FATAL: + ENABLED = true + FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log" diff --git a/cpp/unittest/db/appendix/server_config.yaml b/cpp/unittest/db/appendix/server_config.yaml new file mode 100644 index 0000000000..f92b2f1a18 --- /dev/null +++ b/cpp/unittest/db/appendix/server_config.yaml @@ -0,0 +1,37 @@ +# All the following configurations are default values. + +server_config: + address: 0.0.0.0 # milvus server ip address (IPv4) + port: 19530 # port range: 1025 ~ 65534 + deploy_mode: single # deployment type: single, cluster_readonly, cluster_writable + time_zone: UTC+8 + +db_config: + primary_path: /tmp/milvus # path used to store data and meta + secondary_path: # path used to store data only, split by semicolon + + backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database + # Keep 'dialect://:@:/', and replace other texts with real values. + # Replace 'dialect' with 'mysql' or 'sqlite' + + insert_buffer_size: 4 # GB, maximum insert buffer size allowed + build_index_gpu: 0 # gpu id used for building index + +metric_config: + enable_monitor: false # enable monitoring or not + collector: prometheus # prometheus + prometheus_config: + port: 8080 # port prometheus used to fetch metrics + +cache_config: + cpu_mem_capacity: 16 # GB, CPU memory used for cache + cpu_mem_threshold: 0.85 # percentage of data kept when cache cleanup triggered + cache_insert_data: false # whether load inserted data into cache + +engine_config: + blas_threshold: 20 + +resource_config: + resource_pool: + - cpu + - gpu0 diff --git a/cpp/unittest/db/test_db.cpp b/cpp/unittest/db/test_db.cpp index e502d0f742..9e80afbc09 100644 --- a/cpp/unittest/db/test_db.cpp +++ b/cpp/unittest/db/test_db.cpp @@ -23,14 +23,18 @@ #include "db/DBFactory.h" #include "cache/CpuCacheMgr.h" #include "utils/CommonUtil.h" +#include "server/Config.h" #include #include #include #include + namespace { +static const char *CONFIG_FILE_PATH = "./milvus/conf/server_config.yaml"; + static const char *TABLE_NAME = "test_group"; static constexpr int64_t TABLE_DIM = 256; static constexpr int64_t VECTOR_COUNT = 25000; @@ -228,6 +232,9 @@ TEST_F(DBTest, DB_TEST) { } TEST_F(DBTest, SEARCH_TEST) { + milvus::server::Config &config = milvus::server::Config::GetInstance(); + milvus::Status s = config.LoadConfigFile(CONFIG_FILE_PATH); + milvus::engine::meta::TableSchema table_info = BuildTableSchema(); auto stat = db_->CreateTable(table_info); @@ -290,6 +297,25 @@ TEST_F(DBTest, SEARCH_TEST) { ASSERT_TRUE(stat.ok()); } + //test FAISS_IVFSQ8H optimizer + index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H; + db_->CreateIndex(TABLE_NAME, index); // wait until build index finish + + { + milvus::engine::QueryResults results; + stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results); + ASSERT_TRUE(stat.ok()); + } + + {//search by specify index file + milvus::engine::meta::DatesT dates; + std::vector file_ids = {"1", "2", "3", "4", "5", "6"}; + milvus::engine::QueryResults results; + stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results); + ASSERT_TRUE(stat.ok()); + } + + // TODO(lxj): add groundTruth assert } diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index dfbffc66bc..67beeba36f 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -97,7 +97,7 @@ DBTest::SetUp() { auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance(); res_mgr->Clear(); res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, true, false)); - res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, false)); + res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, true)); res_mgr->Add(milvus::scheduler::ResourceFactory::Create("gtx1660", "GPU", 0, true, true)); auto default_conn = milvus::scheduler::Connection("IO", 500.0);