diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index b5c0575b60..592cb79c87 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -6,6 +6,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-336 - Scheduler interface - MS-344 - Add TaskTable Test - MS-345 - Add Node Test +- MS-346 - Add some implementation of scheduler to solve compile error ## Bug diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp new file mode 100644 index 0000000000..0f7c30c6a7 --- /dev/null +++ b/cpp/src/scheduler/Cost.cpp @@ -0,0 +1,36 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Cost.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +std::vector +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) { + std::vector indexes; + return indexes; +} + + +std::vector +PickToLoad(const TaskTable &task_table, uint64_t limit) { + std::vector indexes; + return indexes; +} + + +std::vector +PickToExecute(const TaskTable &task_table, uint64_t limit) { + std::vector indexes; + return indexes; +} + +} +} +} diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index dc52eacaa3..a08833991d 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -7,8 +7,10 @@ #include #include "Task.h" +#include "TaskTable.h" #include "CacheMgr.h" + namespace zilliz { namespace milvus { namespace engine { @@ -20,8 +22,8 @@ namespace engine { * select tasks to move; * call from scheduler; */ -std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {} +std::vector +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit); /* @@ -29,16 +31,16 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) * call from resource; * I DONT SURE NEED THIS; */ -std::vector -PickToLoad(TaskTable task_table, uint64_t limit) {} +std::vector +PickToLoad(const TaskTable &task_table, uint64_t limit); /* * select task to execute; * call from resource; * I DONT SURE NEED THIS; */ -std::vector -PickToExecute(TaskTable task_table, uint64_t limit) {} +std::vector +PickToExecute(const TaskTable &task_table, uint64_t limit); } diff --git a/cpp/src/scheduler/ResourceFactory.cpp b/cpp/src/scheduler/ResourceFactory.cpp new file mode 100644 index 0000000000..65dd488e3c --- /dev/null +++ b/cpp/src/scheduler/ResourceFactory.cpp @@ -0,0 +1,29 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "ResourceFactory.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +std::shared_ptr +ResourceFactory::Create(const std::string &name, const std::string &alias) { + if (name == "disk") { + return std::make_shared(alias); + } else if (name == "cpu") { + return std::make_shared(alias); + } else if (name == "gpu") { + return std::make_shared(alias); + } else { + return nullptr; + } +} + +} +} +} diff --git a/cpp/src/scheduler/ResourceFactory.h b/cpp/src/scheduler/ResourceFactory.h index f327522975..6a2ab2ab34 100644 --- a/cpp/src/scheduler/ResourceFactory.h +++ b/cpp/src/scheduler/ResourceFactory.h @@ -21,17 +21,7 @@ namespace engine { class ResourceFactory { public: static std::shared_ptr - Create(const std::string &name, const std::string &alias = "") { - if (name == "disk") { - return std::make_shared(alias); - } else if (name == "cpu") { - return std::make_shared(alias); - } else if (name == "gpu") { - return std::make_shared(alias); - } else { - return nullptr; - } - } + Create(const std::string &name, const std::string &alias = ""); }; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp new file mode 100644 index 0000000000..c5ae928166 --- /dev/null +++ b/cpp/src/scheduler/Scheduler.cpp @@ -0,0 +1,59 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Scheduler.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +StartUpEvent::Process() { + +} + +void +FinishTaskEvent::Process() { +// for (nei : res->neighbours) { +// tasks = cost(nei->task_table(), nei->connection, limit = 3) +// res->task_table()->PutTasks(tasks); +// } +// res->WakeUpExec(); +} + +void +CopyCompletedEvent::Process() { + +} + +void +TaskTableUpdatedEvent::Process() { + +} + + +void +Scheduler::Start() { + worker_thread_ = std::thread(&Scheduler::worker_thread_, this); +} + +std::string +Scheduler::Dump() { + return std::string(); +} + +void +Scheduler::worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } +} + +} +} +} diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 4bb840a68d..152937a1d2 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -10,6 +10,9 @@ #include #include +#include "resource/Resource.h" +#include "ResourceMgr.h" + namespace zilliz { namespace milvus { @@ -18,8 +21,7 @@ namespace engine { class Event { public: explicit - Event(ResourceWPtr &resource) - : resource_(resource) {} + Event(ResourceWPtr &resource) : resource_(resource) {} public: virtual void @@ -34,8 +36,7 @@ using EventPtr = std::shared_ptr; class StartUpEvent : public Event { public: explicit - StartUpEvent(ResourceWPtr &resource) - : Event(resource) {} + StartUpEvent(ResourceWPtr &resource) : Event(resource) {} public: void @@ -45,25 +46,17 @@ public: class FinishTaskEvent : public Event { public: explicit - FinishTaskEvent(ResourceWPtr &resource) - : Event(resource) {} + FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {} public: void - Process() override { -// for (nei : res->neighbours) { -// tasks = cost(nei->task_table(), nei->connection, limit = 3) -// res->task_table()->PutTasks(tasks); -// } -// res->WakeUpExec(); - } + Process() override; }; class CopyCompletedEvent : public Event { public: explicit - CopyCompletedEvent(ResourceWPtr &resource) - : Event(resource) {} + CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {} public: void @@ -73,8 +66,7 @@ public: class TaskTableUpdatedEvent : public Event { public: explicit - TaskTableUpdatedEvent(ResourceWPtr &resource) - : Event(resource) {} + TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {} public: void @@ -94,16 +86,16 @@ public: } void - Start() {} + Start(); +public: /******** Events ********/ /* * Process start up events; */ - void + inline void OnStartUp(ResourceWPtr &resource) { - // call from res_mgr, non-blocking, if queue size over limit, exception! auto event = std::make_shared(resource); event_queue_.push(event); } @@ -111,20 +103,29 @@ public: /* * Process finish task events; */ - void - OnFinishTask(ResourceWPtr); + inline void + OnFinishTask(ResourceWPtr &resource) { + auto event = std::make_shared(resource); + event_queue_.push(event); + } /* * Process copy completed events; */ - void - OnCopyCompleted(ResourceWPtr); + inline void + OnCopyCompleted(ResourceWPtr &resource) { + auto event = std::make_shared(resource); + event_queue_.push(event); + } /* * Process task table updated events; */ - void - OnTaskTableUpdated(ResourceWPtr); + inline void + OnTaskTableUpdated(ResourceWPtr &resource) { + auto event = std::make_shared(resource); + event_queue_.push(event); + } public: @@ -133,13 +134,11 @@ public: private: + /* + * Called by worker_thread_; + */ void - worker_function() { - while (running_) { - auto event = event_queue_.front(); - event->Process(); - } - } + worker_function(); private: bool running_; diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp new file mode 100644 index 0000000000..1e77318e86 --- /dev/null +++ b/cpp/src/scheduler/TaskTable.cpp @@ -0,0 +1,89 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TaskTable.h" +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +TaskTable::TaskTable(std::vector &&tasks) { + +} + +void +TaskTable::Put(TaskPtr task) { + +} + +void +TaskTable::Put(std::vector &tasks) { + +} + + +TaskTableItem & +TaskTable::Get(uint64_t index) { + return table_[index]; +} + +void +TaskTable::Clear() { +// find first task is NOT (done or moved), erase from begin to it; +// auto iterator = table_.begin(); +// while (iterator->state == TaskTableItemState::EXECUTED or +// iterator->state == TaskTableItemState::MOVED) +// iterator++; +// table_.erase(table_.begin(), iterator); +} + +bool +TaskTable::Move(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task.mutex); + if (task.state == TaskTableItemState::START) { + task.state = TaskTableItemState::LOADING; + return true; + } + return false; +} + +bool +TaskTable::Moved(uint64_t index) { + return false; +} + +bool +TaskTable::Load(uint64_t index) { + return false; +} + +bool +TaskTable::Loaded(uint64_t index) { + return false; +} + +bool +TaskTable::Execute(uint64_t index) { + return false; +} + +bool +TaskTable::Executed(uint64_t index) { + return false; +} + +std::string +TaskTable::Dump() { + return std::string(); +} + +} +} +} diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 567aa6214d..caee06a731 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -47,26 +47,26 @@ public: TaskTable() = default; explicit - TaskTable(std::vector &&tasks) {} + TaskTable(std::vector &&tasks); /* * Put one task; */ void - Put(TaskPtr task) {} + Put(TaskPtr task); /* * Put tasks back of task table; * Called by DBImpl; */ void - Put(std::vector &tasks) {} + Put(std::vector &tasks); /* * Return task table item reference; */ TaskTableItem & - Get(uint64_t index) {} + Get(uint64_t index); /* * TODO @@ -74,14 +74,7 @@ public: * Called by ? */ void - Clear() { - // find first task is NOT (done or moved), erase from begin to it; -// auto iterator = table_.begin(); -// while (iterator->state == TaskTableItemState::EXECUTED or -// iterator->state == TaskTableItemState::MOVED) -// iterator++; -// table_.erase(table_.begin(), iterator); - } + Clear(); public: @@ -95,16 +88,7 @@ public: // TODO: bool to Status bool - Move(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task.mutex); - if (task.state == TaskTableItemState::START) { - task.state = TaskTableItemState::LOADING; - return true; - } - return false; - } + Move(uint64_t index); /* * Move task finished; @@ -112,7 +96,7 @@ public: * Called by scheduler; */ bool - Moved(uint64_t index) {} + Moved(uint64_t index); /* * Load a task; @@ -120,7 +104,7 @@ public: * Called by loader; */ bool - Load(uint64_t index) {} + Load(uint64_t index); /* * Load task finished; @@ -128,7 +112,7 @@ public: * Called by loader; */ bool - Loaded(uint64_t index) {} + Loaded(uint64_t index); /* * Execute a task; @@ -136,7 +120,7 @@ public: * Called by executor; */ bool - Execute(uint64_t index) {} + Execute(uint64_t index); /* * Execute task finished; @@ -144,7 +128,7 @@ public: * Called by executor; */ bool - Executed(uint64_t index) {} + Executed(uint64_t index); public: /* diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 4fe2a006ac..afd3a401a6 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -143,11 +143,11 @@ private: */ TaskPtr pick_task_load() { - auto tasks = PickToLoad(task_table_, 3); - for (uint64_t i = 0; i < tasks.size(); ++i) { + auto indexes = PickToLoad(task_table_, 3); + for (auto index : indexes) { // try to set one task loading, then return - if (task_table_.Load(i)) - return task_table_.Get(i).task; + if (task_table_.Load(index)) + return task_table_.Get(index).task; // else try next } return nullptr; @@ -159,11 +159,11 @@ private: */ TaskPtr pick_task_execute() { - auto tasks = PickToExecute(task_table_, 3); - for (uint64_t i = 0; i < tasks.size(); ++i) { + auto indexes = PickToExecute(task_table_, 3); + for (auto index : indexes) { // try to set one task executing, then return - if (task_table_.Execute(i)) - return task_table_.Get(i).task; + if (task_table_.Execute(index)) + return task_table_.Get(index).task; // else try next } return nullptr; diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 1f13549b77..93b662cf73 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -21,7 +21,7 @@ TEST(normal_test, DISABLED_test1) { res_mgr->Connect(cpu, gpu1, PCIE); res_mgr->Connect(cpu, gpu2, PCIE); - res_mgr->StartAll(); + res_mgr->Start(); auto task1 = std::make_shared("123456789"); auto task2 = std::make_shared("222222222");