From 9400101582d7ef4c182d9dd0d100f20e37586754 Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 15 Aug 2019 15:47:46 +0800 Subject: [PATCH 1/2] MS-357 Add minimum schedule function Former-commit-id: da78df540b934e3f1d881b16f023d749e4b37101 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Cost.cpp | 2 +- cpp/src/scheduler/Cost.h | 2 +- cpp/src/scheduler/Scheduler.cpp | 59 +++++++++++++++++++++------------ cpp/src/scheduler/Scheduler.h | 13 ++++++-- cpp/src/scheduler/TaskTable.h | 17 +++++++++- 6 files changed, 66 insertions(+), 28 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 6f1c49e9f4..47c5575eff 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-350 - Remove knowhere submodule - MS-354 - Add task class and interface in scheduler - MS-355 - Add copy interface in ExcutionEngine +- MS-357 - Add minimum schedule function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp index 0f7c30c6a7..14b56d98a5 100644 --- a/cpp/src/scheduler/Cost.cpp +++ b/cpp/src/scheduler/Cost.cpp @@ -12,7 +12,7 @@ namespace milvus { namespace engine { std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) { +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { std::vector indexes; return indexes; } diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index 98de9d8fc1..85414337bc 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -23,7 +23,7 @@ namespace engine { * call from scheduler; */ std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit); +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); /* diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c5ae928166..d4ba1ac4a6 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "Scheduler.h" +#include "Cost.h" namespace zilliz { @@ -12,33 +13,55 @@ namespace milvus { namespace engine { void -StartUpEvent::Process() { +push_task(ResourcePtr &self, ResourcePtr &other) { + auto self_task_table = self->task_table(); + auto other_task_table = other->task_table(); + if (!other_task_table.Empty()) { + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index).task; + other_task_table.Put(task); + // TODO: mark moved future + other->WakeupLoader(); + other->WakeupExecutor(); + } + } + } +} +void +schedule(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &nei : self->GetNeighbours()) { + if (auto n = nei.neighbour_node.lock()) { + auto neighbour = std::static_pointer_cast(n); + push_task(self, neighbour); + } + } + + } +} + +void +StartUpEvent::Process() { + schedule(resource_); } void FinishTaskEvent::Process() { -// for (nei : res->neighbours) { -// tasks = cost(nei->task_table(), nei->connection, limit = 3) -// res->task_table()->PutTasks(tasks); -// } -// res->WakeUpExec(); + schedule(resource_); } void CopyCompletedEvent::Process() { - + schedule(resource_); } void TaskTableUpdatedEvent::Process() { - -} - - -void -Scheduler::Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + schedule(resource_); } std::string @@ -46,14 +69,6 @@ Scheduler::Dump() { return std::string(); } -void -Scheduler::worker_function() { - while (running_) { - auto event = event_queue_.front(); - event->Process(); - } -} - } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 152937a1d2..4f7c714c86 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -27,7 +27,7 @@ public: virtual void Process() = 0; -private: +protected: ResourceWPtr resource_; }; @@ -86,7 +86,9 @@ public: } void - Start(); + Start() { + worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + } public: /******** Events ********/ @@ -138,7 +140,12 @@ private: * Called by worker_thread_; */ void - worker_function(); + worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } + } private: bool running_; diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index dc286d8f74..c99d484919 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -75,7 +75,22 @@ public: */ void Clear(); - + + /* + * Return true if task table empty, otherwise false; + */ + inline bool + Empty() { + return table_.empty(); + } + + /* + * Return size of task table; + */ + inline size_t + Size() { + return table_.size(); + } public: From cbc734d83b08e273f0e720b3397799f39c541ece Mon Sep 17 00:00:00 2001 From: "xj.lin" Date: Thu, 15 Aug 2019 15:01:37 +0800 Subject: [PATCH 2/2] MS-337 dev basic Resource Former-commit-id: 7486415fa96cd3db77320aeb4b01dd78a2b4c878 --- cpp/src/scheduler/resource/CpuResource.cpp | 38 +++++ cpp/src/scheduler/resource/CpuResource.h | 21 +-- cpp/src/scheduler/resource/DiskResource.cpp | 28 ++++ cpp/src/scheduler/resource/DiskResource.h | 7 +- cpp/src/scheduler/resource/GpuResource.cpp | 28 ++++ cpp/src/scheduler/resource/GpuResource.h | 7 +- cpp/src/scheduler/resource/Node.cpp | 66 +++++++++ cpp/src/scheduler/resource/Node.h | 23 +-- cpp/src/scheduler/resource/RegisterHandler.h | 24 +++ cpp/src/scheduler/resource/Resource.cpp | 98 +++++++++++++ cpp/src/scheduler/resource/Resource.h | 146 ++++--------------- 11 files changed, 335 insertions(+), 151 deletions(-) create mode 100644 cpp/src/scheduler/resource/CpuResource.cpp create mode 100644 cpp/src/scheduler/resource/DiskResource.cpp create mode 100644 cpp/src/scheduler/resource/GpuResource.cpp create mode 100644 cpp/src/scheduler/resource/Node.cpp create mode 100644 cpp/src/scheduler/resource/RegisterHandler.h create mode 100644 cpp/src/scheduler/resource/Resource.cpp diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp new file mode 100644 index 0000000000..32eb627046 --- /dev/null +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "CpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +CpuResource::CpuResource(std::string name) + : Resource(std::move(name), ResourceType::CPU) {} + +void CpuResource::LoadFile(TaskPtr task) { + //if (src.type == DISK) { + // fd = open(filename); + // content = fd.read(); + // close(fd); + //} else if (src.type == CPU) { + // memcpy(src, dest, len); + //} else if (src.type == GPU) { + // cudaMemcpyD2H(src, dest); + //} else { + // // unknown type, exception + //} +} + +void CpuResource::Process(TaskPtr task) { + +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 995615d1ab..be1340e954 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -17,29 +17,14 @@ namespace engine { class CpuResource : public Resource { public: explicit - CpuResource(std::string name) - : Resource(std::move(name), ResourceType::CPU) {} + CpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override { -// if (src.type == DISK) { -// fd = open(filename); -// content = fd.read(); -// close(fd); -// } else if (src.type == CPU) { -// memcpy(src, dest, len); -// } else if (src.type == GPU) { -// cudaMemcpyD2H(src, dest); -// } else { -// // unknown type, exception -// } - } + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override { - task->Execute(); - } + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp new file mode 100644 index 0000000000..dcc0687ac4 --- /dev/null +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "DiskResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +DiskResource::DiskResource(std::string name) + : Resource(std::move(name), ResourceType::DISK) {} + +void DiskResource::LoadFile(TaskPtr task) { + +} + +void DiskResource::Process(TaskPtr task) { + +} + +} +} +} + diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 77d2e97879..39211dbb66 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -16,15 +16,14 @@ namespace engine { class DiskResource : public Resource { public: explicit - DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + DiskResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp new file mode 100644 index 0000000000..00d5df05b4 --- /dev/null +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "GpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +GpuResource::GpuResource(std::string name) + : Resource(std::move(name), ResourceType::GPU) {} + +void GpuResource::LoadFile(TaskPtr task) { + +} + +void GpuResource::Process(TaskPtr task) { + +} + +} +} +} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 74fae13b75..84bf163284 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -16,15 +16,14 @@ namespace engine { class GpuResource : public Resource { public: explicit - GpuResource(std::string name) - : Resource(std::move(name), ResourceType::GPU) {} + GpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp new file mode 100644 index 0000000000..76d61fe858 --- /dev/null +++ b/cpp/src/scheduler/resource/Node.cpp @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include "Node.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Node::Node() { + static std::atomic_uint_fast8_t counter(0); + id_ = counter++; +} + +void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + neighbours_.erase(search); + } + } +} + +bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + return true; + } + } + return false; +} + +std::vector Node::GetNeighbours() { + std::lock_guard lk(mutex_); + std::vector ret; + for (auto &e : neighbours_) { + ret.push_back(e.second); + } + return ret; +} + +std::string Node::Dump() { + // TODO(linxj): what's that? + return std::__cxx11::string(); +} + +void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_node.lock()) { + Neighbour neighbour(neighbour_node, connection); + neighbours_[s->id_] = neighbour; + } + // else do nothing, consider it.. +} + +} +} +} diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h index 61ba4a343b..a57987ca9c 100644 --- a/cpp/src/scheduler/resource/Node.h +++ b/cpp/src/scheduler/resource/Node.h @@ -7,6 +7,7 @@ #include #include +#include #include "../TaskTable.h" #include "Connection.h" @@ -28,29 +29,31 @@ struct Neighbour { Connection connection; }; +// TODO(linxj): return type void -> Status class Node { public: - void - AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { - Neighbour neighbour(neighbour_node, connection); - neighbours_.emplace_back(neighbour); - } + Node(); void - DelNeighbour(NeighbourNodePtr neighbour_ptr) {} + AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection); + + void + DelNeighbour(const NeighbourNodePtr &neighbour_ptr); bool - IsNeighbour(NeighbourNodePtr neighbour_ptr) {} + IsNeighbour(const NeighbourNodePtr& neighbour_ptr); - const std::vector & - GetNeighbours() {} + std::vector + GetNeighbours(); public: std::string Dump(); private: - std::vector neighbours_; + std::mutex mutex_; + uint8_t id_; + std::map neighbours_; }; using NodePtr = std::shared_ptr; diff --git a/cpp/src/scheduler/resource/RegisterHandler.h b/cpp/src/scheduler/resource/RegisterHandler.h new file mode 100644 index 0000000000..02c55da1e7 --- /dev/null +++ b/cpp/src/scheduler/resource/RegisterHandler.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class RegisterHandler { + public: + virtual void Exec() = 0; +}; + +using RegisterHandlerPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp new file mode 100644 index 0000000000..7f59256474 --- /dev/null +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -0,0 +1,98 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Resource::Resource(std::string name, ResourceType type) + : name_(std::move(name)), + type_(type), + running_(false), + load_flag_(false), + exec_flag_(false) { +} + +void Resource::Start() { + loader_thread_ = std::thread(&Resource::loader_function, this); + executor_thread_ = std::thread(&Resource::executor_function, this); +} + +void Resource::Stop() { + running_ = false; + WakeupLoader(); + WakeupExecutor(); +} + +TaskTable &Resource::task_table() { + return task_table_; +} + +void Resource::WakeupExecutor() { + exec_cv_.notify_one(); +} + +void Resource::WakeupLoader() { + load_cv_.notify_one(); +} + +TaskPtr Resource::pick_task_load() { + auto indexes = PickToLoad(task_table_, 3); + for (auto index : indexes) { + // try to set one task loading, then return + if (task_table_.Load(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +TaskPtr Resource::pick_task_execute() { + auto indexes = PickToExecute(task_table_, 3); + for (auto index : indexes) { + // try to set one task executing, then return + if (task_table_.Execute(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +void Resource::loader_function() { + while (running_) { + std::unique_lock lock(load_mutex_); + load_cv_.wait(lock, [&] { return load_flag_; }); + auto task = pick_task_load(); + if (task) { + LoadFile(task); + GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + } + } +} + +void Resource::executor_function() { + GetRegisterFunc(RegisterType::START_UP)->Exec(); + while (running_) { + std::unique_lock lock(exec_mutex_); + exec_cv_.wait(lock, [&] { return exec_flag_; }); + auto task = pick_task_execute(); + if (task) { + Process(task); + GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + } + } +} + +RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) { + // construct object each time. + return register_table_[type](); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index bd1d11aa3c..2961e281fa 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -15,8 +15,9 @@ #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" -#include "Node.h" #include "Connection.h" +#include "Node.h" +#include "RegisterHandler.h" namespace zilliz { @@ -29,92 +30,50 @@ enum class ResourceType { GPU = 2 }; +enum class RegisterType { + START_UP, + ON_FINISH_TASK, + ON_COPY_COMPLETED, + ON_TASK_TABLE_UPDATED, +}; + class Resource : public Node { public: - void - Start() { - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + /* + * Event function MUST be a short function, never blocking; + */ + template + void Register_T(const RegisterType& type) { + register_table_.emplace(type, [] { return std::make_shared(); }); } + RegisterHandlerPtr + GetRegisterFunc(const RegisterType& type); + void - Stop() { - running_ = false; - WakeupLoader(); - WakeupExecutor(); - } + Start(); + + void + Stop(); TaskTable & - task_table() { - return task_table_; - } + task_table(); public: /* * wake up executor; */ void - WakeupExecutor() { - exec_cv_.notify_one(); - } + WakeupExecutor(); /* * wake up loader; */ void - WakeupLoader() { - load_cv_.notify_one(); - } - -public: - /* - * Event function MUST be a short function, never blocking; - */ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function func) { - on_task_table_updated_ = func; - } + WakeupLoader(); protected: - Resource(std::string name, ResourceType type) - : name_(std::move(name)), - type_(type), - on_start_up_(nullptr), - on_finish_task_(nullptr), - on_copy_completed_(nullptr), - on_task_table_updated_(nullptr), - running_(false), - load_flag_(false), - exec_flag_(false) { - } + Resource(std::string name, ResourceType type); // TODO: SearchContextPtr to TaskPtr /* @@ -142,67 +101,27 @@ private: * Order by start time; */ TaskPtr - pick_task_load() { - auto indexes = PickToLoad(task_table_, 3); - for (auto index : indexes) { - // try to set one task loading, then return - if (task_table_.Load(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_load(); /* * Pick one task to execute; * Pick by start time and priority; */ TaskPtr - pick_task_execute() { - auto indexes = PickToExecute(task_table_, 3); - for (auto index : indexes) { - // try to set one task executing, then return - if (task_table_.Execute(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_execute(); private: /* * Only called by load thread; */ void - loader_function() { - while (running_) { - std::unique_lock lock(load_mutex_); - load_cv_.wait(lock, [&] { return load_flag_; }); - auto task = pick_task_load(); - if (task) { - LoadFile(task); - on_copy_completed_(); - } - } - - } + loader_function(); /* * Only called by worker thread; */ void - executor_function() { - on_start_up_(); - while (running_) { - std::unique_lock lock(exec_mutex_); - exec_cv_.wait(lock, [&] { return exec_flag_; }); - auto task = pick_task_execute(); - if (task) { - Process(task); - on_finish_task_(); - } - } - } + executor_function(); private: @@ -211,10 +130,7 @@ private: TaskTable task_table_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; + std::map> register_table_; bool running_; std::thread loader_thread_;