From 9400101582d7ef4c182d9dd0d100f20e37586754 Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 15 Aug 2019 15:47:46 +0800 Subject: [PATCH 01/22] MS-357 Add minimum schedule function Former-commit-id: da78df540b934e3f1d881b16f023d749e4b37101 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Cost.cpp | 2 +- cpp/src/scheduler/Cost.h | 2 +- cpp/src/scheduler/Scheduler.cpp | 59 +++++++++++++++++++++------------ cpp/src/scheduler/Scheduler.h | 13 ++++++-- cpp/src/scheduler/TaskTable.h | 17 +++++++++- 6 files changed, 66 insertions(+), 28 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 6f1c49e9f4..47c5575eff 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-350 - Remove knowhere submodule - MS-354 - Add task class and interface in scheduler - MS-355 - Add copy interface in ExcutionEngine +- MS-357 - Add minimum schedule function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp index 0f7c30c6a7..14b56d98a5 100644 --- a/cpp/src/scheduler/Cost.cpp +++ b/cpp/src/scheduler/Cost.cpp @@ -12,7 +12,7 @@ namespace milvus { namespace engine { std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) { +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { std::vector indexes; return indexes; } diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index 98de9d8fc1..85414337bc 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -23,7 +23,7 @@ namespace engine { * call from scheduler; */ std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit); +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); /* diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c5ae928166..d4ba1ac4a6 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "Scheduler.h" +#include "Cost.h" namespace zilliz { @@ -12,33 +13,55 @@ namespace milvus { namespace engine { void -StartUpEvent::Process() { +push_task(ResourcePtr &self, ResourcePtr &other) { + auto self_task_table = self->task_table(); + auto other_task_table = other->task_table(); + if (!other_task_table.Empty()) { + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index).task; + other_task_table.Put(task); + // TODO: mark moved future + other->WakeupLoader(); + other->WakeupExecutor(); + } + } + } +} +void +schedule(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &nei : self->GetNeighbours()) { + if (auto n = nei.neighbour_node.lock()) { + auto neighbour = std::static_pointer_cast(n); + push_task(self, neighbour); + } + } + + } +} + +void +StartUpEvent::Process() { + schedule(resource_); } void FinishTaskEvent::Process() { -// for (nei : res->neighbours) { -// tasks = cost(nei->task_table(), nei->connection, limit = 3) -// res->task_table()->PutTasks(tasks); -// } -// res->WakeUpExec(); + schedule(resource_); } void CopyCompletedEvent::Process() { - + schedule(resource_); } void TaskTableUpdatedEvent::Process() { - -} - - -void -Scheduler::Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + schedule(resource_); } std::string @@ -46,14 +69,6 @@ Scheduler::Dump() { return std::string(); } -void -Scheduler::worker_function() { - while (running_) { - auto event = event_queue_.front(); - event->Process(); - } -} - } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 152937a1d2..4f7c714c86 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -27,7 +27,7 @@ public: virtual void Process() = 0; -private: +protected: ResourceWPtr resource_; }; @@ -86,7 +86,9 @@ public: } void - Start(); + Start() { + worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + } public: /******** Events ********/ @@ -138,7 +140,12 @@ private: * Called by worker_thread_; */ void - worker_function(); + worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } + } private: bool running_; diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index dc286d8f74..c99d484919 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -75,7 +75,22 @@ public: */ void Clear(); - + + /* + * Return true if task table empty, otherwise false; + */ + inline bool + Empty() { + return table_.empty(); + } + + /* + * Return size of task table; + */ + inline size_t + Size() { + return table_.size(); + } public: From cbc734d83b08e273f0e720b3397799f39c541ece Mon Sep 17 00:00:00 2001 From: "xj.lin" Date: Thu, 15 Aug 2019 15:01:37 +0800 Subject: [PATCH 02/22] MS-337 dev basic Resource Former-commit-id: 7486415fa96cd3db77320aeb4b01dd78a2b4c878 --- cpp/src/scheduler/resource/CpuResource.cpp | 38 +++++ cpp/src/scheduler/resource/CpuResource.h | 21 +-- cpp/src/scheduler/resource/DiskResource.cpp | 28 ++++ cpp/src/scheduler/resource/DiskResource.h | 7 +- cpp/src/scheduler/resource/GpuResource.cpp | 28 ++++ cpp/src/scheduler/resource/GpuResource.h | 7 +- cpp/src/scheduler/resource/Node.cpp | 66 +++++++++ cpp/src/scheduler/resource/Node.h | 23 +-- cpp/src/scheduler/resource/RegisterHandler.h | 24 +++ cpp/src/scheduler/resource/Resource.cpp | 98 +++++++++++++ cpp/src/scheduler/resource/Resource.h | 146 ++++--------------- 11 files changed, 335 insertions(+), 151 deletions(-) create mode 100644 cpp/src/scheduler/resource/CpuResource.cpp create mode 100644 cpp/src/scheduler/resource/DiskResource.cpp create mode 100644 cpp/src/scheduler/resource/GpuResource.cpp create mode 100644 cpp/src/scheduler/resource/Node.cpp create mode 100644 cpp/src/scheduler/resource/RegisterHandler.h create mode 100644 cpp/src/scheduler/resource/Resource.cpp diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp new file mode 100644 index 0000000000..32eb627046 --- /dev/null +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "CpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +CpuResource::CpuResource(std::string name) + : Resource(std::move(name), ResourceType::CPU) {} + +void CpuResource::LoadFile(TaskPtr task) { + //if (src.type == DISK) { + // fd = open(filename); + // content = fd.read(); + // close(fd); + //} else if (src.type == CPU) { + // memcpy(src, dest, len); + //} else if (src.type == GPU) { + // cudaMemcpyD2H(src, dest); + //} else { + // // unknown type, exception + //} +} + +void CpuResource::Process(TaskPtr task) { + +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 995615d1ab..be1340e954 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -17,29 +17,14 @@ namespace engine { class CpuResource : public Resource { public: explicit - CpuResource(std::string name) - : Resource(std::move(name), ResourceType::CPU) {} + CpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override { -// if (src.type == DISK) { -// fd = open(filename); -// content = fd.read(); -// close(fd); -// } else if (src.type == CPU) { -// memcpy(src, dest, len); -// } else if (src.type == GPU) { -// cudaMemcpyD2H(src, dest); -// } else { -// // unknown type, exception -// } - } + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override { - task->Execute(); - } + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp new file mode 100644 index 0000000000..dcc0687ac4 --- /dev/null +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "DiskResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +DiskResource::DiskResource(std::string name) + : Resource(std::move(name), ResourceType::DISK) {} + +void DiskResource::LoadFile(TaskPtr task) { + +} + +void DiskResource::Process(TaskPtr task) { + +} + +} +} +} + diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 77d2e97879..39211dbb66 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -16,15 +16,14 @@ namespace engine { class DiskResource : public Resource { public: explicit - DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + DiskResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp new file mode 100644 index 0000000000..00d5df05b4 --- /dev/null +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "GpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +GpuResource::GpuResource(std::string name) + : Resource(std::move(name), ResourceType::GPU) {} + +void GpuResource::LoadFile(TaskPtr task) { + +} + +void GpuResource::Process(TaskPtr task) { + +} + +} +} +} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 74fae13b75..84bf163284 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -16,15 +16,14 @@ namespace engine { class GpuResource : public Resource { public: explicit - GpuResource(std::string name) - : Resource(std::move(name), ResourceType::GPU) {} + GpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp new file mode 100644 index 0000000000..76d61fe858 --- /dev/null +++ b/cpp/src/scheduler/resource/Node.cpp @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include "Node.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Node::Node() { + static std::atomic_uint_fast8_t counter(0); + id_ = counter++; +} + +void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + neighbours_.erase(search); + } + } +} + +bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + return true; + } + } + return false; +} + +std::vector Node::GetNeighbours() { + std::lock_guard lk(mutex_); + std::vector ret; + for (auto &e : neighbours_) { + ret.push_back(e.second); + } + return ret; +} + +std::string Node::Dump() { + // TODO(linxj): what's that? + return std::__cxx11::string(); +} + +void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_node.lock()) { + Neighbour neighbour(neighbour_node, connection); + neighbours_[s->id_] = neighbour; + } + // else do nothing, consider it.. +} + +} +} +} diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h index 61ba4a343b..a57987ca9c 100644 --- a/cpp/src/scheduler/resource/Node.h +++ b/cpp/src/scheduler/resource/Node.h @@ -7,6 +7,7 @@ #include #include +#include #include "../TaskTable.h" #include "Connection.h" @@ -28,29 +29,31 @@ struct Neighbour { Connection connection; }; +// TODO(linxj): return type void -> Status class Node { public: - void - AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { - Neighbour neighbour(neighbour_node, connection); - neighbours_.emplace_back(neighbour); - } + Node(); void - DelNeighbour(NeighbourNodePtr neighbour_ptr) {} + AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection); + + void + DelNeighbour(const NeighbourNodePtr &neighbour_ptr); bool - IsNeighbour(NeighbourNodePtr neighbour_ptr) {} + IsNeighbour(const NeighbourNodePtr& neighbour_ptr); - const std::vector & - GetNeighbours() {} + std::vector + GetNeighbours(); public: std::string Dump(); private: - std::vector neighbours_; + std::mutex mutex_; + uint8_t id_; + std::map neighbours_; }; using NodePtr = std::shared_ptr; diff --git a/cpp/src/scheduler/resource/RegisterHandler.h b/cpp/src/scheduler/resource/RegisterHandler.h new file mode 100644 index 0000000000..02c55da1e7 --- /dev/null +++ b/cpp/src/scheduler/resource/RegisterHandler.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class RegisterHandler { + public: + virtual void Exec() = 0; +}; + +using RegisterHandlerPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp new file mode 100644 index 0000000000..7f59256474 --- /dev/null +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -0,0 +1,98 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Resource::Resource(std::string name, ResourceType type) + : name_(std::move(name)), + type_(type), + running_(false), + load_flag_(false), + exec_flag_(false) { +} + +void Resource::Start() { + loader_thread_ = std::thread(&Resource::loader_function, this); + executor_thread_ = std::thread(&Resource::executor_function, this); +} + +void Resource::Stop() { + running_ = false; + WakeupLoader(); + WakeupExecutor(); +} + +TaskTable &Resource::task_table() { + return task_table_; +} + +void Resource::WakeupExecutor() { + exec_cv_.notify_one(); +} + +void Resource::WakeupLoader() { + load_cv_.notify_one(); +} + +TaskPtr Resource::pick_task_load() { + auto indexes = PickToLoad(task_table_, 3); + for (auto index : indexes) { + // try to set one task loading, then return + if (task_table_.Load(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +TaskPtr Resource::pick_task_execute() { + auto indexes = PickToExecute(task_table_, 3); + for (auto index : indexes) { + // try to set one task executing, then return + if (task_table_.Execute(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +void Resource::loader_function() { + while (running_) { + std::unique_lock lock(load_mutex_); + load_cv_.wait(lock, [&] { return load_flag_; }); + auto task = pick_task_load(); + if (task) { + LoadFile(task); + GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + } + } +} + +void Resource::executor_function() { + GetRegisterFunc(RegisterType::START_UP)->Exec(); + while (running_) { + std::unique_lock lock(exec_mutex_); + exec_cv_.wait(lock, [&] { return exec_flag_; }); + auto task = pick_task_execute(); + if (task) { + Process(task); + GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + } + } +} + +RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) { + // construct object each time. + return register_table_[type](); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index bd1d11aa3c..2961e281fa 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -15,8 +15,9 @@ #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" -#include "Node.h" #include "Connection.h" +#include "Node.h" +#include "RegisterHandler.h" namespace zilliz { @@ -29,92 +30,50 @@ enum class ResourceType { GPU = 2 }; +enum class RegisterType { + START_UP, + ON_FINISH_TASK, + ON_COPY_COMPLETED, + ON_TASK_TABLE_UPDATED, +}; + class Resource : public Node { public: - void - Start() { - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + /* + * Event function MUST be a short function, never blocking; + */ + template + void Register_T(const RegisterType& type) { + register_table_.emplace(type, [] { return std::make_shared(); }); } + RegisterHandlerPtr + GetRegisterFunc(const RegisterType& type); + void - Stop() { - running_ = false; - WakeupLoader(); - WakeupExecutor(); - } + Start(); + + void + Stop(); TaskTable & - task_table() { - return task_table_; - } + task_table(); public: /* * wake up executor; */ void - WakeupExecutor() { - exec_cv_.notify_one(); - } + WakeupExecutor(); /* * wake up loader; */ void - WakeupLoader() { - load_cv_.notify_one(); - } - -public: - /* - * Event function MUST be a short function, never blocking; - */ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function func) { - on_task_table_updated_ = func; - } + WakeupLoader(); protected: - Resource(std::string name, ResourceType type) - : name_(std::move(name)), - type_(type), - on_start_up_(nullptr), - on_finish_task_(nullptr), - on_copy_completed_(nullptr), - on_task_table_updated_(nullptr), - running_(false), - load_flag_(false), - exec_flag_(false) { - } + Resource(std::string name, ResourceType type); // TODO: SearchContextPtr to TaskPtr /* @@ -142,67 +101,27 @@ private: * Order by start time; */ TaskPtr - pick_task_load() { - auto indexes = PickToLoad(task_table_, 3); - for (auto index : indexes) { - // try to set one task loading, then return - if (task_table_.Load(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_load(); /* * Pick one task to execute; * Pick by start time and priority; */ TaskPtr - pick_task_execute() { - auto indexes = PickToExecute(task_table_, 3); - for (auto index : indexes) { - // try to set one task executing, then return - if (task_table_.Execute(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_execute(); private: /* * Only called by load thread; */ void - loader_function() { - while (running_) { - std::unique_lock lock(load_mutex_); - load_cv_.wait(lock, [&] { return load_flag_; }); - auto task = pick_task_load(); - if (task) { - LoadFile(task); - on_copy_completed_(); - } - } - - } + loader_function(); /* * Only called by worker thread; */ void - executor_function() { - on_start_up_(); - while (running_) { - std::unique_lock lock(exec_mutex_); - exec_cv_.wait(lock, [&] { return exec_flag_; }); - auto task = pick_task_execute(); - if (task) { - Process(task); - on_finish_task_(); - } - } - } + executor_function(); private: @@ -211,10 +130,7 @@ private: TaskTable task_table_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; + std::map> register_table_; bool running_; std::thread loader_thread_; From c3469fa5613156c6edcbf229f8085810af4dc601 Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 15 Aug 2019 19:09:59 +0800 Subject: [PATCH 03/22] MS-359 Add cost test in new scheduler Former-commit-id: 5ec53217a216927e3b0ddf211f1a56bb1a44f2b4 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 33 ++++++++++--------- cpp/src/scheduler/resource/Node.cpp | 3 +- cpp/unittest/scheduler/CMakeLists.txt | 1 + cpp/unittest/scheduler/cost_test.cpp | 47 +++++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 18 deletions(-) create mode 100644 cpp/unittest/scheduler/cost_test.cpp diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 47c5575eff..e99c3ddb22 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -17,6 +17,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-354 - Add task class and interface in scheduler - MS-355 - Add copy interface in ExcutionEngine - MS-357 - Add minimum schedule function +- MS-359 - Add cost test in new scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index b24e08a413..a75c7a3be6 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -29,22 +29,23 @@ ResourceMgr::Add(ResourcePtr &&resource) { resources_.emplace_back(resource); size_t index = resources_.size() - 1; - resource->RegisterOnStartUp([&] { - start_up_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnFinishTask([&] { - finish_task_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnCopyCompleted([&] { - copy_completed_event_[index] = true; - event_cv_.notify_one(); - }); - resource->RegisterOnTaskTableUpdated([&] { - task_table_updated_event_[index] = true; - event_cv_.notify_one(); - }); + // TODO: update interface +// resource->RegisterOnStartUp([&] { +// start_up_event_[index] = true; +// event_cv_.notify_one(); +// }); +// resource->RegisterOnFinishTask([&] { +// finish_task_event_[index] = true; +// event_cv_.notify_one(); +// }); +// resource->RegisterOnCopyCompleted([&] { +// copy_completed_event_[index] = true; +// event_cv_.notify_one(); +// }); +// resource->RegisterOnTaskTableUpdated([&] { +// task_table_updated_event_[index] = true; +// event_cv_.notify_one(); +// }); return ret; } diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp index 76d61fe858..8e3db29ea2 100644 --- a/cpp/src/scheduler/resource/Node.cpp +++ b/cpp/src/scheduler/resource/Node.cpp @@ -55,8 +55,7 @@ std::string Node::Dump() { void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { std::lock_guard lk(mutex_); if (auto s = neighbour_node.lock()) { - Neighbour neighbour(neighbour_node, connection); - neighbours_[s->id_] = neighbour; + neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection))); } // else do nothing, consider it.. } diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt index c38f4736f7..d47022d317 100644 --- a/cpp/unittest/scheduler/CMakeLists.txt +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -35,6 +35,7 @@ include_directories(/usr/include/mysql) set(scheduler_test_src ${unittest_srcs} + ${test_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp new file mode 100644 index 0000000000..0b0674391d --- /dev/null +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -0,0 +1,47 @@ +#include "scheduler/TaskTable.h" +#include "scheduler/Cost.h" +#include + + +using namespace zilliz::milvus::engine; + +class CostTest : public ::testing::Test { +protected: + void + SetUp() override { + for (uint64_t i = 0; i < 7; ++i) { + auto task = std::make_shared(); + table_.Put(task); + } + table_.Get(0).state = TaskTableItemState::INVALID; + table_.Get(1).state = TaskTableItemState::START; + table_.Get(2).state = TaskTableItemState::LOADING; + table_.Get(3).state = TaskTableItemState::LOADED; + table_.Get(4).state = TaskTableItemState::EXECUTING; + table_.Get(5).state = TaskTableItemState::EXECUTED; + table_.Get(6).state = TaskTableItemState::MOVING; + table_.Get(7).state = TaskTableItemState::MOVED; + } + + + TaskTable table_; +}; + +TEST_F(CostTest, pick_to_move) { + CacheMgr cache; + auto indexes = PickToMove(table_, cache, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 3); +} + +TEST_F(CostTest, pick_to_load) { + auto indexes = PickToLoad(table_, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 1); +} + +TEST_F(CostTest, pick_to_executed) { + auto indexes = PickToExecute(table_, 10); + ASSERT_EQ(indexes.size(), 1); + ASSERT_EQ(indexes[0], 3); +} From df382a16443b21d4a6b7bdda980031997dcafb4d Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 15:03:10 +0800 Subject: [PATCH 04/22] MS-361 Add event in resource Former-commit-id: d5f18c13d7111a582ad0bc839aa49879c6bb7d32 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 56 +++----- cpp/src/scheduler/ResourceMgr.h | 59 +++------ cpp/src/scheduler/Scheduler.cpp | 16 +-- cpp/src/scheduler/Scheduler.h | 123 ++++++------------ cpp/src/scheduler/event/CopyCompletedEvent.h | 27 ++++ cpp/src/scheduler/event/Event.h | 42 ++++++ cpp/src/scheduler/event/FinishTaskEvent.h | 27 ++++ cpp/src/scheduler/event/StartUpEvent.h | 24 ++++ .../scheduler/event/TaskTableUpdatedEvent.h | 25 ++++ cpp/src/scheduler/resource/Resource.cpp | 14 +- cpp/src/scheduler/resource/Resource.h | 25 +++- 12 files changed, 261 insertions(+), 178 deletions(-) create mode 100644 cpp/src/scheduler/event/CopyCompletedEvent.h create mode 100644 cpp/src/scheduler/event/Event.h create mode 100644 cpp/src/scheduler/event/FinishTaskEvent.h create mode 100644 cpp/src/scheduler/event/StartUpEvent.h create mode 100644 cpp/src/scheduler/event/TaskTableUpdatedEvent.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e99c3ddb22..56d7c8e12b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -18,6 +18,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-355 - Add copy interface in ExcutionEngine - MS-357 - Add minimum schedule function - MS-359 - Add cost test in new scheduler +- MS-361 - Add event in resource ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index a75c7a3be6..a0625c836f 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -7,6 +7,7 @@ #include "ResourceMgr.h" #include "db/Log.h" + namespace zilliz { namespace milvus { namespace engine { @@ -21,31 +22,22 @@ ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); std::lock_guard lck(resources_mutex_); - if(running_) { + if (running_) { ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource"; return ret; } + if (resource->Type() == ResourceType::DISK) { + disk_resources_.emplace_back(ResourceWPtr(resource)); + } resources_.emplace_back(resource); size_t index = resources_.size() - 1; - // TODO: update interface -// resource->RegisterOnStartUp([&] { -// start_up_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnFinishTask([&] { -// finish_task_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnCopyCompleted([&] { -// copy_completed_event_[index] = true; -// event_cv_.notify_one(); -// }); -// resource->RegisterOnTaskTableUpdated([&] { -// task_table_updated_event_[index] = true; -// event_cv_.notify_one(); -// }); + resource->RegisterSubscriber([&](EventPtr event) { + queue_.emplace(event); + std::unique_lock lock(event_mutex_); + event_cv_.notify_one(); + }); return ret; } @@ -61,31 +53,17 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect void ResourceMgr::EventProcess() { while (running_) { - std::unique_lock lock(resources_mutex_); - event_cv_.wait(lock, [this] { return !resources_.empty(); }); + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); - if(!running_) { + if (!running_) { break; } - for (uint64_t i = 0; i < resources_.size(); ++i) { - ResourceWPtr res(resources_[i]); - if (start_up_event_[i]) { - on_start_up_(res); - start_up_event_[i] = false; - } - if (finish_task_event_[i]) { - on_finish_task_(res); - finish_task_event_[i] = false; - } - if (copy_completed_event_[i]) { - on_copy_completed_(res); - copy_completed_event_[i] = false; - } - if (task_table_updated_event_[i]) { - on_task_table_updated_(res); - task_table_updated_event_[i] = false; - } + auto event = queue_.front(); + queue_.pop(); + if (subscriber_) { + subscriber_(event); } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index e7a7650695..714924c9b9 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -10,10 +10,12 @@ #include #include #include +#include #include #include "resource/Resource.h" + namespace zilliz { namespace milvus { namespace engine { @@ -23,6 +25,15 @@ public: ResourceMgr(); /******** Management Interface ********/ + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + std::vector & + GetDiskResources() { + return disk_resources_; + } /* * Add resource into Resource Management; @@ -51,41 +62,6 @@ public: // TODO: add stats interface(low) -public: - /******** Event Register Interface ********/ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function &func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function &func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function &func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function &func) { - on_task_table_updated_ = func; - } - public: /******** Utlitity Functions ********/ @@ -97,22 +73,19 @@ private: EventProcess(); private: + std::queue queue_; + std::function subscriber_ = nullptr; + bool running_; + std::vector disk_resources_; std::vector resources_; mutable std::mutex resources_mutex_; std::thread worker_thread_; + std::mutex event_mutex_; std::condition_variable event_cv_; - std::vector start_up_event_; - std::vector finish_task_event_; - std::vector copy_completed_event_; - std::vector task_table_updated_event_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; }; using ResourceMgrWPtr = std::weak_ptr; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index d4ba1ac4a6..fbf66628b9 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -45,23 +45,23 @@ schedule(const ResourceWPtr &res) { } void -StartUpEvent::Process() { - schedule(resource_); +Scheduler::OnStartUp(const EventPtr &event) { + schedule(event->resource_); } void -FinishTaskEvent::Process() { - schedule(resource_); +Scheduler::OnFinishTask(const EventPtr &event) { + schedule(event->resource_); } void -CopyCompletedEvent::Process() { - schedule(resource_); +Scheduler::OnCopyCompleted(const EventPtr &event) { + schedule(event->resource_); } void -TaskTableUpdatedEvent::Process() { - schedule(resource_); +Scheduler::OnTaskTableUpdated(const EventPtr &event) { + schedule(event->resource_); } std::string diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 4f7c714c86..66088dd5b6 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -18,60 +18,6 @@ namespace zilliz { namespace milvus { namespace engine { -class Event { -public: - explicit - Event(ResourceWPtr &resource) : resource_(resource) {} - -public: - virtual void - Process() = 0; - -protected: - ResourceWPtr resource_; -}; - -using EventPtr = std::shared_ptr; - -class StartUpEvent : public Event { -public: - explicit - StartUpEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class FinishTaskEvent : public Event { -public: - explicit - FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class CopyCompletedEvent : public Event { -public: - explicit - CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; - -class TaskTableUpdatedEvent : public Event { -public: - explicit - TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {} - -public: - void - Process() override; -}; class Scheduler { public: @@ -90,52 +36,65 @@ public: worker_thread_ = std::thread(&Scheduler::worker_thread_, this); } -public: + std::string + Dump(); + +private: /******** Events ********/ /* * Process start up events; */ - inline void - OnStartUp(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnStartUp(const EventPtr &event); /* * Process finish task events; */ - inline void - OnFinishTask(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnFinishTask(const EventPtr &event); /* * Process copy completed events; */ - inline void - OnCopyCompleted(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } + void + OnCopyCompleted(const EventPtr &event); /* * Process task table updated events; */ - inline void - OnTaskTableUpdated(ResourceWPtr &resource) { - auto event = std::make_shared(resource); - event_queue_.push(event); - } - - -public: - std::string - Dump(); - + void + OnTaskTableUpdated(const EventPtr &event); private: + /* + * Dispatch event to event handler; + */ + void + Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + break; + } + } + } + /* * Called by worker_thread_; */ @@ -143,7 +102,7 @@ private: worker_function() { while (running_) { auto event = event_queue_.front(); - event->Process(); + Process(event); } } diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h new file mode 100644 index 0000000000..8db63490eb --- /dev/null +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -0,0 +1,27 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" +#include "../TaskTable.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class CopyCompletedEvent : public Event { +public: + CopyCompletedEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + : Event(EventType::COPY_COMPLETED, std::move(resource)), + task_table_item_(task_table_item) {} +public: + TaskTableItem &task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h new file mode 100644 index 0000000000..4b04d5404b --- /dev/null +++ b/cpp/src/scheduler/event/Event.h @@ -0,0 +1,42 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class EventType { + START_UP, + COPY_COMPLETED, + FINISH_TASK, + TASK_TABLE_UPDATED +}; + +class Resource; + +class Event { +public: + explicit + Event(EventType type, std::weak_ptr resource) + : type_(type), + resource_(std::move(resource)) {} + + inline EventType + Type() const { + return type_; + } + +public: + EventType type_; + std::weak_ptr resource_; +}; + +using EventPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h new file mode 100644 index 0000000000..34658719f9 --- /dev/null +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -0,0 +1,27 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class FinishTaskEvent : public Event { +public: + FinishTaskEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + : Event(EventType::FINISH_TASK, std::move(resource)), + task_table_item_(task_table_item) {} + +public: + TaskTableItem &task_table_item_; +}; + +} +} +} diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h new file mode 100644 index 0000000000..04bc462dcc --- /dev/null +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class StartUpEvent : public Event { +public: + explicit + StartUpEvent(std::weak_ptr resource) + : Event(EventType::START_UP, std::move(resource)) {} +}; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h new file mode 100644 index 0000000000..8658316222 --- /dev/null +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -0,0 +1,25 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Event.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TaskTableUpdatedEvent : public Event { +public: + explicit + TaskTableUpdatedEvent(std::weak_ptr resource) + : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 7f59256474..4177e97588 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -70,20 +70,30 @@ void Resource::loader_function() { auto task = pick_task_load(); if (task) { LoadFile(task); - GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this(), task); +// subscriber_(std::static_pointer_cast(event)); + } } } } void Resource::executor_function() { GetRegisterFunc(RegisterType::START_UP)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this()); +// subscriber_(std::static_pointer_cast(event)); + } while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); auto task = pick_task_execute(); if (task) { Process(task); - GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + if (subscriber_) { +// auto event = std::make_shared(shared_from_this(), task); +// subscriber_(std::static_pointer_cast(event)); + } } } } diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 2961e281fa..b33f17c4e0 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -12,6 +12,10 @@ #include #include +#include "../event/Event.h" +#include "../event/StartUpEvent.h" +#include "../event/CopyCompletedEvent.h" +#include "../event/FinishTaskEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" @@ -37,18 +41,28 @@ enum class RegisterType { ON_TASK_TABLE_UPDATED, }; -class Resource : public Node { +class Resource : public Node, public std::enable_shared_from_this { public: /* * Event function MUST be a short function, never blocking; */ - template - void Register_T(const RegisterType& type) { + template + void Register_T(const RegisterType &type) { register_table_.emplace(type, [] { return std::make_shared(); }); } RegisterHandlerPtr - GetRegisterFunc(const RegisterType& type); + GetRegisterFunc(const RegisterType &type); + + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + + inline ResourceType + Type() const { + return type_; + } void Start(); @@ -131,8 +145,11 @@ private: TaskTable task_table_; std::map> register_table_; + std::function subscriber_ = nullptr; bool running_; + bool loader_running_ = false; + bool executor_running_ = false; std::thread loader_thread_; std::thread executor_thread_; From 8ffd68ec513a707625fcf8f8716bad3d2c989744 Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 15:29:35 +0800 Subject: [PATCH 05/22] MS-364 Modify tasktableitem in tasktable Former-commit-id: 79ac92b18cc3842d6ccaceb6bdd50ff636ca80ab --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Scheduler.cpp | 2 +- cpp/src/scheduler/TaskTable.cpp | 8 ++--- cpp/src/scheduler/TaskTable.h | 14 +++++---- cpp/src/scheduler/resource/Resource.cpp | 4 +-- cpp/unittest/scheduler/cost_test.cpp | 16 +++++----- cpp/unittest/scheduler/tasktable_test.cpp | 36 +++++++++++------------ 7 files changed, 42 insertions(+), 39 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 56d7c8e12b..e05cb6cd7b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -19,6 +19,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-357 - Add minimum schedule function - MS-359 - Add cost test in new scheduler - MS-361 - Add event in resource +- MS-364 - Modify tasktableitem in tasktable ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index fbf66628b9..c9d92ccb57 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -21,7 +21,7 @@ push_task(ResourcePtr &self, ResourcePtr &other) { auto indexes = PickToMove(self_task_table, cache, 1); for (auto index : indexes) { if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index).task; + auto task = self_task_table.Get(index)->task; other_task_table.Put(task); // TODO: mark moved future other->WakeupLoader(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 1e77318e86..b42407d710 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -27,7 +27,7 @@ TaskTable::Put(std::vector &tasks) { } -TaskTableItem & +TaskTableItemPtr TaskTable::Get(uint64_t index) { return table_[index]; } @@ -46,9 +46,9 @@ bool TaskTable::Move(uint64_t index) { auto &task = table_[index]; - std::lock_guard lock(task.mutex); - if (task.state == TaskTableItemState::START) { - task.state = TaskTableItemState::LOADING; + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::START) { + task->state = TaskTableItemState::LOADING; return true; } return false; diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index c99d484919..1d02c8334c 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -31,7 +31,7 @@ struct TaskTableItem { TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} TaskTableItem(const TaskTableItem &src) - : id(src.id), state(src.state), mutex(), priority(src.priority) {} + : id(src.id), state(src.state), mutex(), priority(src.priority) {} uint64_t id; // auto increment from 0; // TODO: add tag into task @@ -42,6 +42,8 @@ struct TaskTableItem { uint8_t priority; // just a number, meaningless; }; +using TaskTableItemPtr = std::shared_ptr; + class TaskTable { public: TaskTable() = default; @@ -65,7 +67,7 @@ public: /* * Return task table item reference; */ - TaskTableItem & + TaskTableItemPtr Get(uint64_t index); /* @@ -75,7 +77,7 @@ public: */ void Clear(); - + /* * Return true if task table empty, otherwise false; */ @@ -83,11 +85,11 @@ public: Empty() { return table_.empty(); } - + /* * Return size of task table; */ - inline size_t + inline size_t Size() { return table_.size(); } @@ -154,7 +156,7 @@ public: private: // TODO: map better ? - std::deque table_; + std::deque table_; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 4177e97588..9650f216d6 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -46,7 +46,7 @@ TaskPtr Resource::pick_task_load() { for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) - return task_table_.Get(index).task; + return task_table_.Get(index)->task; // else try next } return nullptr; @@ -57,7 +57,7 @@ TaskPtr Resource::pick_task_execute() { for (auto index : indexes) { // try to set one task executing, then return if (task_table_.Execute(index)) - return task_table_.Get(index).task; + return task_table_.Get(index)->task; // else try next } return nullptr; diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp index 0b0674391d..27f1c08254 100644 --- a/cpp/unittest/scheduler/cost_test.cpp +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -13,14 +13,14 @@ protected: auto task = std::make_shared(); table_.Put(task); } - table_.Get(0).state = TaskTableItemState::INVALID; - table_.Get(1).state = TaskTableItemState::START; - table_.Get(2).state = TaskTableItemState::LOADING; - table_.Get(3).state = TaskTableItemState::LOADED; - table_.Get(4).state = TaskTableItemState::EXECUTING; - table_.Get(5).state = TaskTableItemState::EXECUTED; - table_.Get(6).state = TaskTableItemState::MOVING; - table_.Get(7).state = TaskTableItemState::MOVED; + table_.Get(0)->state = TaskTableItemState::INVALID; + table_.Get(1)->state = TaskTableItemState::START; + table_.Get(2)->state = TaskTableItemState::LOADING; + table_.Get(3)->state = TaskTableItemState::LOADED; + table_.Get(4)->state = TaskTableItemState::EXECUTING; + table_.Get(5)->state = TaskTableItemState::EXECUTED; + table_.Get(6)->state = TaskTableItemState::MOVING; + table_.Get(7)->state = TaskTableItemState::MOVED; } diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index 68255b8e93..a9ad9ebca5 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -58,19 +58,19 @@ protected: TEST_F(TaskTableBaseTest, put_task) { empty_table_.Put(task1_); - ASSERT_EQ(empty_table_.Get(0).task, task1_); + ASSERT_EQ(empty_table_.Get(0)->task, task1_); } TEST_F(TaskTableBaseTest, put_invalid_test) { empty_table_.Put(invalid_task_); - ASSERT_EQ(empty_table_.Get(0).task, invalid_task_); + ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_); } TEST_F(TaskTableBaseTest, put_batch) { std::vector tasks{task1_, task2_}; empty_table_.Put(tasks); - ASSERT_EQ(empty_table_.Get(0).task, task1_); - ASSERT_EQ(empty_table_.Get(1).task, task2_); + ASSERT_EQ(empty_table_.Get(0)->task, task1_); + ASSERT_EQ(empty_table_.Get(1)->task, task2_); } TEST_F(TaskTableBaseTest, put_empty_batch) { @@ -89,14 +89,14 @@ protected: table1_.Put(task); } - table1_.Get(0).state = TaskTableItemState::INVALID; - table1_.Get(1).state = TaskTableItemState::START; - table1_.Get(2).state = TaskTableItemState::LOADING; - table1_.Get(3).state = TaskTableItemState::LOADED; - table1_.Get(4).state = TaskTableItemState::EXECUTING; - table1_.Get(5).state = TaskTableItemState::EXECUTED; - table1_.Get(6).state = TaskTableItemState::MOVING; - table1_.Get(7).state = TaskTableItemState::MOVED; + table1_.Get(0)->state = TaskTableItemState::INVALID; + table1_.Get(1)->state = TaskTableItemState::START; + table1_.Get(2)->state = TaskTableItemState::LOADING; + table1_.Get(3)->state = TaskTableItemState::LOADED; + table1_.Get(4)->state = TaskTableItemState::EXECUTING; + table1_.Get(5)->state = TaskTableItemState::EXECUTED; + table1_.Get(6)->state = TaskTableItemState::MOVING; + table1_.Get(7)->state = TaskTableItemState::MOVED; } TaskTable table1_; @@ -106,22 +106,22 @@ TEST_F(TaskTableAdvanceTest, load) { table1_.Load(1); table1_.Loaded(2); - ASSERT_EQ(table1_.Get(1).state, TaskTableItemState::LOADING); - ASSERT_EQ(table1_.Get(2).state, TaskTableItemState::LOADED); + ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING); + ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED); } TEST_F(TaskTableAdvanceTest, execute) { table1_.Execute(3); table1_.Executed(4); - ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::EXECUTING); - ASSERT_EQ(table1_.Get(4).state, TaskTableItemState::EXECUTED); + ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING); + ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED); } TEST_F(TaskTableAdvanceTest, move) { table1_.Move(3); table1_.Moved(6); - ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::MOVING); - ASSERT_EQ(table1_.Get(6).state, TaskTableItemState::MOVED); + ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING); + ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED); } From 404bc88b3f70846a2794feceb01eb394664fae4a Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 15:36:56 +0800 Subject: [PATCH 06/22] MS-365 Use tasktableitemptr instead in event Former-commit-id: a2d8fe7456dc59da280f475ae489a63fa9c3d362 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/event/CopyCompletedEvent.h | 6 ++-- cpp/src/scheduler/event/FinishTaskEvent.h | 6 ++-- cpp/src/scheduler/resource/Resource.cpp | 32 ++++++++++---------- cpp/src/scheduler/resource/Resource.h | 4 +-- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e05cb6cd7b..5e3d747ee4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -20,6 +20,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-359 - Add cost test in new scheduler - MS-361 - Add event in resource - MS-364 - Modify tasktableitem in tasktable +- MS-365 - Use tasktableitemptr instead in event ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h index 8db63490eb..c84c59333e 100644 --- a/cpp/src/scheduler/event/CopyCompletedEvent.h +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -15,11 +15,11 @@ namespace engine { class CopyCompletedEvent : public Event { public: - CopyCompletedEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + CopyCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::COPY_COMPLETED, std::move(resource)), - task_table_item_(task_table_item) {} + task_table_item_(std::move(task_table_item)) {} public: - TaskTableItem &task_table_item_; + TaskTableItemPtr task_table_item_; }; } diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h index 34658719f9..2739bb2fcc 100644 --- a/cpp/src/scheduler/event/FinishTaskEvent.h +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -14,12 +14,12 @@ namespace engine { class FinishTaskEvent : public Event { public: - FinishTaskEvent(std::weak_ptr resource, TaskTableItem &task_table_item) + FinishTaskEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::FINISH_TASK, std::move(resource)), - task_table_item_(task_table_item) {} + task_table_item_(std::move(task_table_item)) {} public: - TaskTableItem &task_table_item_; + TaskTableItemPtr task_table_item_; }; } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 9650f216d6..cf9ff9d882 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -41,23 +41,23 @@ void Resource::WakeupLoader() { load_cv_.notify_one(); } -TaskPtr Resource::pick_task_load() { +TaskTableItemPtr Resource::pick_task_load() { auto indexes = PickToLoad(task_table_, 3); for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) - return task_table_.Get(index)->task; + return task_table_.Get(index); // else try next } return nullptr; } -TaskPtr Resource::pick_task_execute() { +TaskTableItemPtr Resource::pick_task_execute() { auto indexes = PickToExecute(task_table_, 3); for (auto index : indexes) { // try to set one task executing, then return if (task_table_.Execute(index)) - return task_table_.Get(index)->task; + return task_table_.Get(index); // else try next } return nullptr; @@ -67,12 +67,12 @@ void Resource::loader_function() { while (running_) { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); - auto task = pick_task_load(); - if (task) { - LoadFile(task); + auto task_item = pick_task_load(); + if (task_item) { + LoadFile(task_item->task); if (subscriber_) { -// auto event = std::make_shared(shared_from_this(), task); -// subscriber_(std::static_pointer_cast(event)); + auto event = std::make_shared(shared_from_this(), task_item); + subscriber_(std::static_pointer_cast(event)); } } } @@ -81,18 +81,18 @@ void Resource::loader_function() { void Resource::executor_function() { GetRegisterFunc(RegisterType::START_UP)->Exec(); if (subscriber_) { -// auto event = std::make_shared(shared_from_this()); -// subscriber_(std::static_pointer_cast(event)); + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); } while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); - auto task = pick_task_execute(); - if (task) { - Process(task); + auto task_item = pick_task_execute(); + if (task_item) { + Process(task_item->task); if (subscriber_) { -// auto event = std::make_shared(shared_from_this(), task); -// subscriber_(std::static_pointer_cast(event)); + auto event = std::make_shared(shared_from_this(), task_item); + subscriber_(std::static_pointer_cast(event)); } } } diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index b33f17c4e0..a9a1454ed5 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -114,14 +114,14 @@ private: * Pick one task to load; * Order by start time; */ - TaskPtr + TaskTableItemPtr pick_task_load(); /* * Pick one task to execute; * Pick by start time and priority; */ - TaskPtr + TaskTableItemPtr pick_task_execute(); private: From 484f255f5e939a1884cd156e3a6d24b15a0c58b3 Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 16:09:02 +0800 Subject: [PATCH 07/22] MS-366 Implement TaskTable Former-commit-id: e6d189c5c63b0477384353bf6f75482eeb196c7d --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/TaskTable.cpp | 50 +++++++++++++++++++++++++++++---- cpp/src/scheduler/TaskTable.h | 3 -- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 5e3d747ee4..63f6999353 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -21,6 +21,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-361 - Add event in resource - MS-364 - Modify tasktableitem in tasktable - MS-365 - Use tasktableitemptr instead in event +- MS-366 - Implement TaskTable ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index b42407d710..bac4d245da 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -12,18 +12,23 @@ namespace zilliz { namespace milvus { namespace engine { -TaskTable::TaskTable(std::vector &&tasks) { - -} void TaskTable::Put(TaskPtr task) { - + auto item = std::make_shared(); + item->task = std::move(task); + item->state = TaskTableItemState::LOADED; + table_.push_back(item); } void TaskTable::Put(std::vector &tasks) { - + for (auto &task : tasks) { + auto item = std::make_shared(); + item->task = std::move(task); + item->state = TaskTableItemState::LOADED; + table_.push_back(item); + } } @@ -56,26 +61,61 @@ TaskTable::Move(uint64_t index) { bool TaskTable::Moved(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::MOVING) { + task->state = TaskTableItemState::MOVED; + return true; + } return false; } bool TaskTable::Load(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::START) { + task->state = TaskTableItemState::LOADING; + return true; + } return false; } bool TaskTable::Loaded(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::LOADING) { + task->state = TaskTableItemState::LOADED; + return true; + } return false; } bool TaskTable::Execute(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::LOADED) { + task->state = TaskTableItemState::EXECUTING; + return true; + } return false; } bool TaskTable::Executed(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task->mutex); + if (task->state == TaskTableItemState::EXECUTING) { + task->state = TaskTableItemState::EXECUTED; + return true; + } return false; } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 1d02c8334c..21c376a627 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -48,9 +48,6 @@ class TaskTable { public: TaskTable() = default; - explicit - TaskTable(std::vector &&tasks); - /* * Put one task; */ From 2ea773b93006e0ac06cfde82f4512e503ae56e39 Mon Sep 17 00:00:00 2001 From: wxyu Date: Fri, 16 Aug 2019 17:51:05 +0800 Subject: [PATCH 08/22] MS-368 Implement cost.cpp Former-commit-id: 554f06b109662bb381bb06aa73ddab956fb8c47c --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Cost.cpp | 24 +++++++++++++++++++++--- cpp/src/scheduler/Cost.h | 6 +++--- cpp/src/scheduler/TaskTable.h | 8 ++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 63f6999353..5bb6053d49 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -22,6 +22,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-364 - Modify tasktableitem in tasktable - MS-365 - Use tasktableitemptr instead in event - MS-366 - Implement TaskTable +- MS-368 - Implement cost.cpp ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp index 14b56d98a5..724a717d2f 100644 --- a/cpp/src/scheduler/Cost.cpp +++ b/cpp/src/scheduler/Cost.cpp @@ -12,22 +12,40 @@ namespace milvus { namespace engine { std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { +PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::LOADED) { + indexes.push_back(i); + ++count; + } + } return indexes; } std::vector -PickToLoad(const TaskTable &task_table, uint64_t limit) { +PickToLoad(TaskTable &task_table, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::START) { + indexes.push_back(i); + ++count; + } + } return indexes; } std::vector -PickToExecute(const TaskTable &task_table, uint64_t limit) { +PickToExecute(TaskTable &task_table, uint64_t limit) { std::vector indexes; + for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { + if (task_table[i]->state == TaskTableItemState::LOADED) { + indexes.push_back(i); + ++count; + } + } return indexes; } diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index 85414337bc..76f16d4d1d 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -23,7 +23,7 @@ namespace engine { * call from scheduler; */ std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); +PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); /* @@ -32,7 +32,7 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limi * I DONT SURE NEED THIS; */ std::vector -PickToLoad(const TaskTable &task_table, uint64_t limit); +PickToLoad(TaskTable &task_table, uint64_t limit); /* * select task to execute; @@ -40,7 +40,7 @@ PickToLoad(const TaskTable &task_table, uint64_t limit); * I DONT SURE NEED THIS; */ std::vector -PickToExecute(const TaskTable &task_table, uint64_t limit); +PickToExecute(TaskTable &task_table, uint64_t limit); } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 21c376a627..528c2f3d5c 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -90,6 +90,14 @@ public: Size() { return table_.size(); } +public: + TaskTableItemPtr & + operator[](uint64_t index) { + return table_[index]; + } + + std::deque::iterator begin() { return table_.begin(); } + std::deque::iterator end() { return table_.end(); } public: From 2c9bb7224d794caebb341b5a310ab3694e4d6381 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sat, 17 Aug 2019 17:10:18 +0800 Subject: [PATCH 09/22] MS-371 Add TaskTableUpdatedEvent Former-commit-id: cfcd409967d2e9d8f1747432f3b99c5e75ded768 --- cpp/src/scheduler/ResourceMgr.h | 1 + cpp/src/scheduler/Scheduler.h | 3 + cpp/src/scheduler/TaskTable.cpp | 7 ++ cpp/src/scheduler/TaskTable.h | 7 ++ cpp/src/scheduler/resource/Resource.cpp | 6 ++ cpp/src/scheduler/resource/Resource.h | 1 + cpp/src/scheduler/task/TestTask.cpp | 26 +++++++ cpp/src/scheduler/task/TestTask.h | 34 ++++++++++ cpp/unittest/scheduler/resource_test.cpp | 83 +++++++++++++++++++++++ cpp/unittest/scheduler/scheduler_test.cpp | 17 +++++ 10 files changed, 185 insertions(+) create mode 100644 cpp/src/scheduler/task/TestTask.cpp create mode 100644 cpp/src/scheduler/task/TestTask.h create mode 100644 cpp/unittest/scheduler/resource_test.cpp create mode 100644 cpp/unittest/scheduler/scheduler_test.cpp diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 714924c9b9..fc7744cd2b 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -88,6 +88,7 @@ private: }; +using ResourceMgrPtr = std::shared_ptr; using ResourceMgrWPtr = std::weak_ptr; } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 66088dd5b6..5e50826238 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#include #include #include #include @@ -114,6 +115,8 @@ private: std::thread worker_thread_; }; +using SchedulerPtr = std::shared_ptr; + } } } diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index bac4d245da..807d35be4a 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "TaskTable.h" +#include "event/TaskTableUpdatedEvent.h" #include @@ -19,6 +20,9 @@ TaskTable::Put(TaskPtr task) { item->task = std::move(task); item->state = TaskTableItemState::LOADED; table_.push_back(item); + if (subscriber_) { + subscriber_(); + } } void @@ -29,6 +33,9 @@ TaskTable::Put(std::vector &tasks) { item->state = TaskTableItemState::LOADED; table_.push_back(item); } + if (subscriber_) { + subscriber_(); + } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 528c2f3d5c..70a2899331 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -10,6 +10,7 @@ #include #include "task/SearchTask.h" +#include "event/Event.h" namespace zilliz { @@ -48,6 +49,11 @@ class TaskTable { public: TaskTable() = default; + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + /* * Put one task; */ @@ -162,6 +168,7 @@ public: private: // TODO: map better ? std::deque table_; + std::function subscriber_ = nullptr; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index cf9ff9d882..ea6f71a359 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -16,6 +16,12 @@ Resource::Resource(std::string name, ResourceType type) running_(false), load_flag_(false), exec_flag_(false) { + task_table_.RegisterSubscriber([&] { + if (subscriber_) { + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); + } + }); } void Resource::Start() { diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index a9a1454ed5..ab7aab7bac 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -16,6 +16,7 @@ #include "../event/StartUpEvent.h" #include "../event/CopyCompletedEvent.h" #include "../event/FinishTaskEvent.h" +#include "../event/TaskTableUpdatedEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp new file mode 100644 index 0000000000..a974482e52 --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TestTask.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +void +TestTask::Load(LoadType type, uint8_t device_id) { + load_count_++; +} + +void +TestTask::Execute() { + exec_count_++; +} + +} +} +} + diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h new file mode 100644 index 0000000000..5f49d2e31e --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TestTask : public Task { +public: + TestTask() = default; + +public: + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + +public: + uint64_t load_count_; + uint64_t exec_count_; +}; + + +} +} +} diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp new file mode 100644 index 0000000000..afb484b376 --- /dev/null +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -0,0 +1,83 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "scheduler/resource/Resource.h" +#include "scheduler/resource/DiskResource.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/resource/GpuResource.h" +#include "scheduler/task/Task.h" +#include "scheduler/task/TestTask.h" +#include "scheduler/ResourceFactory.h" +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceTest : public testing::Test { +protected: + void + SetUp() override { + disk_resource_ = ResourceFactory::Create("disk"); + cpu_resource_ = ResourceFactory::Create("cpu"); + gpu_resource_ = ResourceFactory::Create("gpu"); + flag_ = false; + + auto subscriber = [&](EventPtr) { + std::unique_lock lock(mutex_); + flag_ = true; + cv_.notify_one(); + }; + + disk_resource_->RegisterSubscriber(subscriber); + cpu_resource_->RegisterSubscriber(subscriber); + gpu_resource_->RegisterSubscriber(subscriber); + } + + void + Wait() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return flag_; }); + } + + ResourcePtr disk_resource_; + ResourcePtr cpu_resource_; + ResourcePtr gpu_resource_; + bool flag_; + std::mutex mutex_; + std::condition_variable cv_; +}; + + +TEST_F(ResourceTest, cpu_resource_test) { + auto task = std::make_shared(); + cpu_resource_->task_table().Put(task); + cpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + cpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + +TEST_F(ResourceTest, gpu_resource_test) { + auto task = std::make_shared(); + gpu_resource_->task_table().Put(task); + gpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + gpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + + +} +} +} diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp new file mode 100644 index 0000000000..787ae59329 --- /dev/null +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -0,0 +1,17 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +} +} +} From b3cc7de2c035701cd9cda6e248c91390348fb092 Mon Sep 17 00:00:00 2001 From: "xj.lin" Date: Sat, 17 Aug 2019 17:24:09 +0800 Subject: [PATCH 10/22] MS372 dev clone index and get index deviceId Former-commit-id: adc85e54cc20c644f9d39df823c8b1b539ec7a19 --- cpp/src/wrapper/knowhere/vec_impl.cpp | 17 +++++++++++++++++ cpp/src/wrapper/knowhere/vec_impl.h | 2 ++ cpp/src/wrapper/knowhere/vec_index.h | 4 ++++ 3 files changed, 23 insertions(+) diff --git a/cpp/src/wrapper/knowhere/vec_impl.cpp b/cpp/src/wrapper/knowhere/vec_impl.cpp index d1cc1ae4ff..f6bdd82618 100644 --- a/cpp/src/wrapper/knowhere/vec_impl.cpp +++ b/cpp/src/wrapper/knowhere/vec_impl.cpp @@ -8,6 +8,7 @@ #include "knowhere/index/vector_index/idmap.h" #include "knowhere/index/vector_index/gpu_ivf.h" #include "knowhere/common/exception.h" +#include "knowhere/index/vector_index/cloner.h" #include "vec_impl.h" #include "data_transfer.h" @@ -152,6 +153,22 @@ VecIndexPtr VecIndexImpl::CopyToCpu(const Config &cfg) { return std::make_shared(cpu_index, type); } +VecIndexPtr VecIndexImpl::Clone() { + auto clone_index = std::make_shared(index_->Clone(), type); + clone_index->dim = dim; + return clone_index; +} + +int64_t VecIndexImpl::GetDeviceId() { + if (auto device_idx = std::dynamic_pointer_cast(index_)){ + return device_idx->GetGpuDevice(); + } + else { + return -1; // -1 == cpu + } + return 0; +} + float *BFIndex::GetRawVectors() { auto raw_index = std::dynamic_pointer_cast(index_); if (raw_index) { return raw_index->GetRawVectors(); } diff --git a/cpp/src/wrapper/knowhere/vec_impl.h b/cpp/src/wrapper/knowhere/vec_impl.h index 5e46c16f70..f03f299f78 100644 --- a/cpp/src/wrapper/knowhere/vec_impl.h +++ b/cpp/src/wrapper/knowhere/vec_impl.h @@ -33,6 +33,8 @@ class VecIndexImpl : public VecIndex { server::KnowhereError Add(const long &nb, const float *xb, const long *ids, const Config &cfg) override; zilliz::knowhere::BinarySet Serialize() override; server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) override; + VecIndexPtr Clone() override; + int64_t GetDeviceId() override; server::KnowhereError Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) override; protected: diff --git a/cpp/src/wrapper/knowhere/vec_index.h b/cpp/src/wrapper/knowhere/vec_index.h index 088228386c..19f0c6d360 100644 --- a/cpp/src/wrapper/knowhere/vec_index.h +++ b/cpp/src/wrapper/knowhere/vec_index.h @@ -63,6 +63,10 @@ class VecIndex { virtual VecIndexPtr CopyToCpu(const Config &cfg = Config()) = 0; + virtual VecIndexPtr Clone() = 0; + + virtual int64_t GetDeviceId() = 0; + virtual IndexType GetType() = 0; virtual int64_t Dimension() = 0; From 4cb3e103fcc776984a7aaf57c70234714787cc73 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sat, 17 Aug 2019 19:05:58 +0800 Subject: [PATCH 11/22] MS-371 Add TaskTableUpdatedEvent Former-commit-id: f252d63664cb09b502c7280d7a82f0cb1fcac169 --- cpp/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 5bb6053d49..8b34cf85e6 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -23,6 +23,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-365 - Use tasktableitemptr instead in event - MS-366 - Implement TaskTable - MS-368 - Implement cost.cpp +- MS-371 - Add TaskTableUpdatedEvent ## New Feature - MS-343 - Implement ResourceMgr From a591e965c9bb6eefe7c95e766c0d4db3c6aa8f12 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sat, 17 Aug 2019 19:08:10 +0800 Subject: [PATCH 12/22] MS-373 Add resource test Former-commit-id: 64c86f4852c4d0cb6d31a679e63ad7555e03d088 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/TaskTable.cpp | 8 ++++---- cpp/src/scheduler/resource/CpuResource.cpp | 3 ++- cpp/src/scheduler/resource/GpuResource.cpp | 4 ++-- cpp/src/scheduler/resource/Resource.cpp | 20 +++++++++++++++----- cpp/src/scheduler/resource/Resource.h | 13 ++++++------- cpp/unittest/scheduler/cost_test.cpp | 2 +- cpp/unittest/scheduler/node_test.cpp | 2 ++ cpp/unittest/scheduler/resource_test.cpp | 19 ++++++++++++++++--- 9 files changed, 49 insertions(+), 23 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 8b34cf85e6..bc75974f15 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-366 - Implement TaskTable - MS-368 - Implement cost.cpp - MS-371 - Add TaskTableUpdatedEvent +- MS-373 - Add resource test ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 807d35be4a..9287b47033 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -18,7 +18,7 @@ void TaskTable::Put(TaskPtr task) { auto item = std::make_shared(); item->task = std::move(task); - item->state = TaskTableItemState::LOADED; + item->state = TaskTableItemState::START; table_.push_back(item); if (subscriber_) { subscriber_(); @@ -30,7 +30,7 @@ TaskTable::Put(std::vector &tasks) { for (auto &task : tasks) { auto item = std::make_shared(); item->task = std::move(task); - item->state = TaskTableItemState::LOADED; + item->state = TaskTableItemState::START; table_.push_back(item); } if (subscriber_) { @@ -59,8 +59,8 @@ TaskTable::Move(uint64_t index) { auto &task = table_[index]; std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::START) { - task->state = TaskTableItemState::LOADING; + if (task->state == TaskTableItemState::LOADED) { + task->state = TaskTableItemState::MOVING; return true; } return false; diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 32eb627046..fa6eda6a58 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -16,6 +16,7 @@ CpuResource::CpuResource(std::string name) : Resource(std::move(name), ResourceType::CPU) {} void CpuResource::LoadFile(TaskPtr task) { + task->Load(LoadType::DISK2CPU, 0); //if (src.type == DISK) { // fd = open(filename); // content = fd.read(); @@ -30,7 +31,7 @@ void CpuResource::LoadFile(TaskPtr task) { } void CpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index 00d5df05b4..df6827881c 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -16,11 +16,11 @@ GpuResource::GpuResource(std::string name) : Resource(std::move(name), ResourceType::GPU) {} void GpuResource::LoadFile(TaskPtr task) { - + task->Load(LoadType::CPU2GPU, 0); } void GpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index ea6f71a359..d059a04ffa 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -25,6 +25,7 @@ Resource::Resource(std::string name, ResourceType type) } void Resource::Start() { + running_ = true; loader_thread_ = std::thread(&Resource::loader_function, this); executor_thread_ = std::thread(&Resource::executor_function, this); } @@ -33,18 +34,24 @@ void Resource::Stop() { running_ = false; WakeupLoader(); WakeupExecutor(); + loader_thread_.join(); + executor_thread_.join(); } TaskTable &Resource::task_table() { return task_table_; } -void Resource::WakeupExecutor() { - exec_cv_.notify_one(); +void Resource::WakeupLoader() { + std::lock_guard lock(load_mutex_); + load_flag_ = true; + load_cv_.notify_one(); } -void Resource::WakeupLoader() { - load_cv_.notify_one(); +void Resource::WakeupExecutor() { + std::lock_guard lock(exec_mutex_); + exec_flag_ = true; + exec_cv_.notify_one(); } TaskTableItemPtr Resource::pick_task_load() { @@ -73,9 +80,12 @@ void Resource::loader_function() { while (running_) { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); + load_flag_ = false; auto task_item = pick_task_load(); if (task_item) { LoadFile(task_item->task); + // TODO: wrapper loaded + task_item->state = TaskTableItemState::LOADED; if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); @@ -85,7 +95,6 @@ void Resource::loader_function() { } void Resource::executor_function() { - GetRegisterFunc(RegisterType::START_UP)->Exec(); if (subscriber_) { auto event = std::make_shared(shared_from_this()); subscriber_(std::static_pointer_cast(event)); @@ -93,6 +102,7 @@ void Resource::executor_function() { while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); + exec_flag_ = false; auto task_item = pick_task_execute(); if (task_item) { Process(task_item->task); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index ab7aab7bac..e8445422a2 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -76,17 +76,17 @@ public: public: /* - * wake up executor; - */ - void - WakeupExecutor(); - - /* * wake up loader; */ void WakeupLoader(); + /* + * wake up executor; + */ + void + WakeupExecutor(); + protected: Resource(std::string name, ResourceType type); @@ -138,7 +138,6 @@ private: void executor_function(); - private: std::string name_; ResourceType type_; diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp index 27f1c08254..d4c05257d1 100644 --- a/cpp/unittest/scheduler/cost_test.cpp +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -9,7 +9,7 @@ class CostTest : public ::testing::Test { protected: void SetUp() override { - for (uint64_t i = 0; i < 7; ++i) { + for (uint64_t i = 0; i < 8; ++i) { auto task = std::make_shared(); table_.Put(task); } diff --git a/cpp/unittest/scheduler/node_test.cpp b/cpp/unittest/scheduler/node_test.cpp index a2249e715b..f0621043db 100644 --- a/cpp/unittest/scheduler/node_test.cpp +++ b/cpp/unittest/scheduler/node_test.cpp @@ -10,6 +10,8 @@ protected: SetUp() override { node1_ = std::make_shared(); node2_ = std::make_shared(); + node3_ = std::make_shared(); + node4_ = std::make_shared(); auto pcie = Connection("PCIe", 11.0); diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index afb484b376..0395856fea 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -27,15 +27,28 @@ protected: gpu_resource_ = ResourceFactory::Create("gpu"); flag_ = false; - auto subscriber = [&](EventPtr) { + auto subscriber = [&](EventPtr event) { std::unique_lock lock(mutex_); - flag_ = true; - cv_.notify_one(); + if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) { + flag_ = true; + cv_.notify_one(); + } }; disk_resource_->RegisterSubscriber(subscriber); cpu_resource_->RegisterSubscriber(subscriber); gpu_resource_->RegisterSubscriber(subscriber); + + disk_resource_->Start(); + cpu_resource_->Start(); + gpu_resource_->Start(); + } + + void + TearDown() override { + disk_resource_->Stop(); + cpu_resource_->Stop(); + gpu_resource_->Stop(); } void From 511970733d2a77a5c3f09935a94ee0bffacc0a70 Mon Sep 17 00:00:00 2001 From: quicksilver Date: Sat, 17 Aug 2019 22:14:54 +0800 Subject: [PATCH 13/22] add DOCKER_PUBLISH_USER parameter Former-commit-id: fcf5da0a8fae941b2763905aa5d1bbd7975ffa03 --- ci/jenkinsfile/publish_docker.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/jenkinsfile/publish_docker.groovy b/ci/jenkinsfile/publish_docker.groovy index d43b06c9d3..cb8e686310 100644 --- a/ci/jenkinsfile/publish_docker.groovy +++ b/ci/jenkinsfile/publish_docker.groovy @@ -8,7 +8,7 @@ container('publish-docker') { sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" try { - docker.withRegistry('https://registry.zilliz.com', 'a54e38ef-c424-4ea9-9224-b25fc20e3924') { + docker.withRegistry('https://registry.zilliz.com', '${params.DOCKER_PUBLISH_USER}') { def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}") customImage.push() } From ac230a7ec3bf2be6ae343d8da2879f13d3aad357 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 19:59:15 +0800 Subject: [PATCH 14/22] MS-374 Add action definition Former-commit-id: d3ed4be48d8c27f380b9d20d7eb1bff6abe7cee0 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/action/Action.h | 34 +++++++++++++ .../action/PullTaskFromNeighbour.cpp | 24 +++++++++ .../scheduler/action/PushTaskToNeighbour.cpp | 49 +++++++++++++++++++ 4 files changed, 108 insertions(+) create mode 100644 cpp/src/scheduler/action/Action.h create mode 100644 cpp/src/scheduler/action/PullTaskFromNeighbour.cpp create mode 100644 cpp/src/scheduler/action/PushTaskToNeighbour.cpp diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index bc75974f15..07f8ba345d 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -25,6 +25,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-368 - Implement cost.cpp - MS-371 - Add TaskTableUpdatedEvent - MS-373 - Add resource test +- MS-374 - Add action definition ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h new file mode 100644 index 0000000000..d72bbefc8d --- /dev/null +++ b/cpp/src/scheduler/action/Action.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "../resource/Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Action { +public: + /* + * Push task to neighbour; + */ + static void + PushTaskToNeighbour(const ResourceWPtr &self); + + + /* + * Pull task From neighbour; + */ + static void + PullTaskFromNeighbour(const ResourceWPtr &self); +}; + + +} +} +} diff --git a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp new file mode 100644 index 0000000000..b1ac97b6e4 --- /dev/null +++ b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +Action::PullTaskFromNeighbour(const ResourceWPtr &self) { + // TODO: implement +} + + +} +} +} + + diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp new file mode 100644 index 0000000000..4347ee842b --- /dev/null +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -0,0 +1,49 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +push_task(ResourcePtr &self, ResourcePtr &other) { + auto self_task_table = self->task_table(); + auto other_task_table = other->task_table(); + if (!other_task_table.Empty()) { + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index)->task; + other_task_table.Put(task); + // TODO: mark moved future + other->WakeupLoader(); + other->WakeupExecutor(); + } + } + } +} + +void +Action::PushTaskToNeighbour(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &neighbour : self->GetNeighbours()) { + if (auto n = neighbour.neighbour_node.lock()) { + auto neighbour = std::static_pointer_cast(n); + push_task(self, neighbour); + } + } + } +} + + +} +} +} + From 40d8172671b992951b373678f5c47f08c4c07fde Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 19:59:56 +0800 Subject: [PATCH 15/22] MS-374 Add action definition Former-commit-id: 68c221b2845cb3fd640433eea72dba4490ca977f --- .../scheduler/action/PushTaskToNeighbour.cpp | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 4347ee842b..c99f490f11 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -13,19 +13,15 @@ namespace engine { void push_task(ResourcePtr &self, ResourcePtr &other) { - auto self_task_table = self->task_table(); - auto other_task_table = other->task_table(); - if (!other_task_table.Empty()) { - CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future - other->WakeupLoader(); - other->WakeupExecutor(); - } + auto &self_task_table = self->task_table(); + auto &other_task_table = other->task_table(); + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index)->task; + other_task_table.Put(task); + // TODO: mark moved future } } } From de6d68238fb3bc9c300b44fc7a78222952a8c9ed Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 20:01:44 +0800 Subject: [PATCH 16/22] MS-375 Add Dump implementation for Event Former-commit-id: c8518b48914da67bce45660866970c8c4eee915a --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/event/CopyCompletedEvent.h | 8 ++++ cpp/src/scheduler/event/Event.h | 9 ++++ cpp/src/scheduler/event/EventDump.cpp | 45 +++++++++++++++++++ cpp/src/scheduler/event/FinishTaskEvent.h | 7 +++ cpp/src/scheduler/event/StartUpEvent.h | 7 +++ .../scheduler/event/TaskTableUpdatedEvent.h | 7 +++ 7 files changed, 84 insertions(+) create mode 100644 cpp/src/scheduler/event/EventDump.cpp diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 07f8ba345d..2f565857ea 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -26,6 +26,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-371 - Add TaskTableUpdatedEvent - MS-373 - Add resource test - MS-374 - Add action definition +- MS-375 - Add Dump implementation for Event ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h index c84c59333e..d2f5ddb0ff 100644 --- a/cpp/src/scheduler/event/CopyCompletedEvent.h +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -18,6 +18,14 @@ public: CopyCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::COPY_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event); + public: TaskTableItemPtr task_table_item_; }; diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h index 4b04d5404b..788cfd6a73 100644 --- a/cpp/src/scheduler/event/Event.h +++ b/cpp/src/scheduler/event/Event.h @@ -5,6 +5,8 @@ ******************************************************************************/ #pragma once +#include + namespace zilliz { namespace milvus { namespace engine { @@ -30,6 +32,13 @@ public: return type_; } + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Event &event); + public: EventType type_; std::weak_ptr resource_; diff --git a/cpp/src/scheduler/event/EventDump.cpp b/cpp/src/scheduler/event/EventDump.cpp new file mode 100644 index 0000000000..0d10f6f7b4 --- /dev/null +++ b/cpp/src/scheduler/event/EventDump.cpp @@ -0,0 +1,45 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Event.h" +#include "StartUpEvent.h" +#include "CopyCompletedEvent.h" +#include "FinishTaskEvent.h" +#include "TaskTableUpdatedEvent.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +std::ostream &operator<<(std::ostream &out, const Event &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) { + out << event.Dump(); + return out; +} + +} +} +} diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h index 2739bb2fcc..14daa9b532 100644 --- a/cpp/src/scheduler/event/FinishTaskEvent.h +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -18,6 +18,13 @@ public: : Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) {} + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event); + public: TaskTableItemPtr task_table_item_; }; diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h index 04bc462dcc..4b5ec78cd6 100644 --- a/cpp/src/scheduler/event/StartUpEvent.h +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -17,6 +17,13 @@ public: explicit StartUpEvent(std::weak_ptr resource) : Event(EventType::START_UP, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event); }; } diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h index 8658316222..f96c30674c 100644 --- a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -17,6 +17,13 @@ public: explicit TaskTableUpdatedEvent(std::weak_ptr resource) : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event); }; From 4ed498893d1ab2f991a018993c684e72947d7c3b Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 20:04:54 +0800 Subject: [PATCH 17/22] MS-376 Add loader and executor enable flag in Resource avoid diskresource execute task Former-commit-id: 489f60f97444c963a3dbfd913672f79b8a9a330e --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/resource/CpuResource.cpp | 11 -------- cpp/src/scheduler/resource/DiskResource.cpp | 3 ++- cpp/src/scheduler/resource/Resource.cpp | 28 +++++++++++++++------ cpp/src/scheduler/resource/Resource.h | 9 ++++--- 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 2f565857ea..a30a34563b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -27,6 +27,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-373 - Add resource test - MS-374 - Add action definition - MS-375 - Add Dump implementation for Event +- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index fa6eda6a58..11c7796187 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -17,17 +17,6 @@ CpuResource::CpuResource(std::string name) void CpuResource::LoadFile(TaskPtr task) { task->Load(LoadType::DISK2CPU, 0); - //if (src.type == DISK) { - // fd = open(filename); - // content = fd.read(); - // close(fd); - //} else if (src.type == CPU) { - // memcpy(src, dest, len); - //} else if (src.type == GPU) { - // cudaMemcpyD2H(src, dest); - //} else { - // // unknown type, exception - //} } void CpuResource::Process(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index dcc0687ac4..66cb72b062 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -12,7 +12,8 @@ namespace engine { DiskResource::DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + : Resource(std::move(name), ResourceType::DISK, true, false) { +} void DiskResource::LoadFile(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index d059a04ffa..ed5d57b7dd 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -10,10 +10,15 @@ namespace zilliz { namespace milvus { namespace engine { -Resource::Resource(std::string name, ResourceType type) +Resource::Resource(std::string name, + ResourceType type, + bool enable_loader, + bool enable_executor) : name_(std::move(name)), type_(type), running_(false), + enable_loader_(enable_loader), + enable_executor_(enable_executor), load_flag_(false), exec_flag_(false) { task_table_.RegisterSubscriber([&] { @@ -26,16 +31,24 @@ Resource::Resource(std::string name, ResourceType type) void Resource::Start() { running_ = true; - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + if (enable_loader_) { + loader_thread_ = std::thread(&Resource::loader_function, this); + } + if (enable_executor_) { + executor_thread_ = std::thread(&Resource::executor_function, this); + } } void Resource::Stop() { running_ = false; - WakeupLoader(); - WakeupExecutor(); - loader_thread_.join(); - executor_thread_.join(); + if (enable_loader_) { + WakeupLoader(); + loader_thread_.join(); + } + if (enable_executor_) { + WakeupExecutor(); + executor_thread_.join(); + } } TaskTable &Resource::task_table() { @@ -106,6 +119,7 @@ void Resource::executor_function() { auto task_item = pick_task_execute(); if (task_item) { Process(task_item->task); + task_item->state = TaskTableItemState::EXECUTED; if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index e8445422a2..769661c67b 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -88,7 +88,10 @@ public: WakeupExecutor(); protected: - Resource(std::string name, ResourceType type); + Resource(std::string name, + ResourceType type, + bool enable_loader = true, + bool enable_executor = true); // TODO: SearchContextPtr to TaskPtr /* @@ -148,8 +151,8 @@ private: std::function subscriber_ = nullptr; bool running_; - bool loader_running_ = false; - bool executor_running_ = false; + bool enable_loader_ = true; + bool enable_executor_ = true; std::thread loader_thread_; std::thread executor_thread_; From 07ec15f9cd1960643f65afa5331b9ffbf67b7018 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 20:08:15 +0800 Subject: [PATCH 18/22] MS-377 Improve process thread trigger in ResourceMgr, Scheduler and TaskTable Former-commit-id: f121fba66ae395b03ae31c400216b5e9c301f0cf --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 64 +++++++------ cpp/src/scheduler/ResourceMgr.h | 5 +- cpp/src/scheduler/Scheduler.cpp | 144 +++++++++++++++++++++--------- cpp/src/scheduler/Scheduler.h | 83 ++++++++--------- cpp/src/scheduler/TaskTable.cpp | 28 +++++- cpp/src/scheduler/TaskTable.h | 5 ++ 7 files changed, 218 insertions(+), 112 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index a30a34563b..69fdd9ac2b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-374 - Add action definition - MS-375 - Add Dump implementation for Event - MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task +- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index a0625c836f..916aaa238a 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) { resources_.emplace_back(resource); size_t index = resources_.size() - 1; - resource->RegisterSubscriber([&](EventPtr event) { - queue_.emplace(event); - std::unique_lock lock(event_mutex_); - event_cv_.notify_one(); - }); + resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1)); return ret; } @@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect if (auto observe_a = res1.lock()) { if (auto observe_b = res2.lock()) { observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + observe_b->AddNeighbour(std::static_pointer_cast(observe_a), connection); } } } -void -ResourceMgr::EventProcess() { - while (running_) { - std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { return !queue_.empty(); }); - - if (!running_) { - break; - } - - auto event = queue_.front(); - queue_.pop(); - if (subscriber_) { - subscriber_(event); - } - } -} void ResourceMgr::Start() { @@ -74,23 +54,33 @@ ResourceMgr::Start() { for (auto &resource : resources_) { resource->Start(); } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - running_ = true; + worker_thread_ = std::thread(&ResourceMgr::event_process, this); } void ResourceMgr::Stop() { - std::lock_guard lck(resources_mutex_); - - running_ = false; + { + std::lock_guard lock(event_mutex_); + running_ = false; + queue_.push(nullptr); + event_cv_.notify_one(); + } worker_thread_.join(); + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { resource->Stop(); } } +void +ResourceMgr::PostEvent(const EventPtr &event) { + std::unique_lock lock(event_mutex_); + queue_.emplace(event); + event_cv_.notify_one(); +} + std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; @@ -103,6 +93,26 @@ ResourceMgr::Dump() { return str; } +void +ResourceMgr::event_process() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); + + auto event = queue_.front(); + if (event == nullptr) { + break; + } + +// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; + + queue_.pop(); + if (subscriber_) { + subscriber_(event); + } + } +} + } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index fc7744cd2b..cb2e631935 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -14,6 +14,7 @@ #include #include "resource/Resource.h" +#include "utils/Log.h" namespace zilliz { @@ -59,6 +60,8 @@ public: void Stop(); + void + PostEvent(const EventPtr& event); // TODO: add stats interface(low) @@ -70,7 +73,7 @@ public: private: void - EventProcess(); + event_process(); private: std::queue queue_; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c9d92ccb57..06dfa669db 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,64 +4,48 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Scheduler.h" #include "Cost.h" +#include "action/Action.h" namespace zilliz { namespace milvus { namespace engine { -void -push_task(ResourcePtr &self, ResourcePtr &other) { - auto self_task_table = self->task_table(); - auto other_task_table = other->task_table(); - if (!other_task_table.Empty()) { - CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future - other->WakeupLoader(); - other->WakeupExecutor(); - } - } +Scheduler::Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { + if (auto mgr = res_mgr_.lock()) { + mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } } -void -schedule(const ResourceWPtr &res) { - if (auto self = res.lock()) { - for (auto &nei : self->GetNeighbours()) { - if (auto n = nei.neighbour_node.lock()) { - auto neighbour = std::static_pointer_cast(n); - push_task(self, neighbour); - } - } +void +Scheduler::Start() { + running_ = true; + worker_thread_ = std::thread(&Scheduler::worker_function, this); +} + +void +Scheduler::Stop() { + { + std::lock_guard lock(event_mutex_); + running_ = false; + event_queue_.push(nullptr); + event_cv_.notify_one(); } + worker_thread_.join(); } void -Scheduler::OnStartUp(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnFinishTask(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnCopyCompleted(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnTaskTableUpdated(const EventPtr &event) { - schedule(event->resource_); +Scheduler::PostEvent(const EventPtr &event) { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + event_cv_.notify_one(); +// SERVER_LOG_DEBUG << "Scheduler post " << *event; } std::string @@ -69,6 +53,82 @@ Scheduler::Dump() { return std::string(); } +void +Scheduler::worker_function() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); + auto event = event_queue_.front(); + if (event == nullptr) { + break; + } + +// SERVER_LOG_DEBUG << "Scheduler process " << *event; + event_queue_.pop(); + Process(event); + } +} + +void +Scheduler::Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + // TODO: logging + break; + } + } +} + + +void +Scheduler::OnStartUp(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + +void +Scheduler::OnFinishTask(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupExecutor(); + } +} + +void +Scheduler::OnCopyCompleted(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + resource->WakeupExecutor(); + if (resource->Type()== ResourceType::DISK) { + Action::PushTaskToNeighbour(event->resource_); + } + } +} + +void +Scheduler::OnTaskTableUpdated(const EventPtr &event) { +// Action::PushTaskToNeighbour(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 5e50826238..012a479a82 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -13,6 +13,7 @@ #include "resource/Resource.h" #include "ResourceMgr.h" +#include "utils/Log.h" namespace zilliz { @@ -23,20 +24,32 @@ namespace engine { class Scheduler { public: explicit - Scheduler(ResourceMgrWPtr res_mgr) - : running_(false), - res_mgr_(std::move(res_mgr)) { -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); - } + Scheduler(ResourceMgrWPtr res_mgr); + Scheduler(const Scheduler &) = delete; + Scheduler(Scheduler &&) = delete; + + /* + * Start worker thread; + */ void - Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); - } + Start(); + /* + * Stop worker thread, join it; + */ + void + Stop(); + + /* + * Post event to scheduler event queue; + */ + void + PostEvent(const EventPtr &event); + + /* + * Dump as string; + */ std::string Dump(); @@ -45,24 +58,37 @@ private: /* * Process start up events; + * + * Actions: + * Pull task from neighbours; */ void OnStartUp(const EventPtr &event); /* * Process finish task events; + * + * Actions: + * Pull task from neighbours; */ void OnFinishTask(const EventPtr &event); /* * Process copy completed events; + * + * Actions: + * Mark task source MOVED; + * Pull task from neighbours; */ void OnCopyCompleted(const EventPtr &event); /* - * Process task table updated events; + * Process task table updated events, which happened on task_table->put; + * + * Actions: + * Push task to neighbours; */ void OnTaskTableUpdated(const EventPtr &event); @@ -72,40 +98,13 @@ private: * Dispatch event to event handler; */ void - Process(const EventPtr &event) { - switch (event->Type()) { - case EventType::START_UP: { - OnStartUp(event); - break; - } - case EventType::COPY_COMPLETED: { - OnCopyCompleted(event); - break; - } - case EventType::FINISH_TASK: { - OnFinishTask(event); - break; - } - case EventType::TASK_TABLE_UPDATED: { - OnTaskTableUpdated(event); - break; - } - default: { - break; - } - } - } + Process(const EventPtr &event); /* * Called by worker_thread_; */ void - worker_function() { - while (running_) { - auto event = event_queue_.front(); - Process(event); - } - } + worker_function(); private: bool running_; @@ -113,6 +112,8 @@ private: ResourceMgrWPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; + std::mutex event_mutex_; + std::condition_variable event_cv_; }; using SchedulerPtr = std::shared_ptr; diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 9287b47033..7ef033c500 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -7,6 +7,7 @@ #include "TaskTable.h" #include "event/TaskTableUpdatedEvent.h" #include +#include namespace zilliz { @@ -16,7 +17,9 @@ namespace engine { void TaskTable::Put(TaskPtr task) { + std::lock_guard lock(id_mutex_); auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; table_.push_back(item); @@ -27,8 +30,10 @@ TaskTable::Put(TaskPtr task) { void TaskTable::Put(std::vector &tasks) { + std::lock_guard lock(id_mutex_); for (auto &task : tasks) { auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; table_.push_back(item); @@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) { return false; } +std::string +ToString(TaskTableItemState state) { + switch (state) { + case TaskTableItemState::INVALID: return "INVALID"; + case TaskTableItemState::START: return "START"; + case TaskTableItemState::LOADING: return "LOADING"; + case TaskTableItemState::LOADED: return "LOADED"; + case TaskTableItemState::EXECUTING: return "EXECUTING"; + case TaskTableItemState::EXECUTED: return "EXECUTED"; + case TaskTableItemState::MOVING: return "MOVING"; + case TaskTableItemState::MOVED: return "MOVED"; + default: return ""; + } +} + std::string TaskTable::Dump() { - return std::string(); + std::stringstream ss; + for (auto &item : table_) { + ss << "<" << item->id; + ss << ", " << ToString(item->state); + ss << ">" << std::endl; + } + return ss.str(); } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 70a2899331..8a482d0579 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -49,6 +49,9 @@ class TaskTable { public: TaskTable() = default; + TaskTable(const TaskTable &) = delete; + TaskTable(TaskTable &&) = delete; + inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); @@ -167,6 +170,8 @@ public: private: // TODO: map better ? + std::uint64_t id_ = 0; + mutable std::mutex id_mutex_; std::deque table_; std::function subscriber_ = nullptr; }; From 08a0f545da7026f716d22682960a0a65fe1b1664 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 20:10:32 +0800 Subject: [PATCH 19/22] MS-378 Debug and Update normal_test in scheduler unittest Former-commit-id: 7b5add0a2d27185abeec2a1c5df51716aa9fc361 --- cpp/CHANGELOG.md | 1 + cpp/unittest/scheduler/CMakeLists.txt | 4 +++ cpp/unittest/scheduler/normal_test.cpp | 43 ++++++++++++++++------- cpp/unittest/scheduler/tasktable_test.cpp | 2 -- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 69fdd9ac2b..8232bd77c0 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -29,6 +29,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-375 - Add Dump implementation for Event - MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable +- MS-378 - Debug and Update normal_test in scheduler unittest ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt index d47022d317..24210cb84d 100644 --- a/cpp/unittest/scheduler/CMakeLists.txt +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -14,6 +14,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) @@ -36,6 +38,8 @@ include_directories(/usr/include/mysql) set(scheduler_test_src ${unittest_srcs} ${test_srcs} + ${scheduler_action_srcs} + ${scheduler_event_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index af5d1b91f8..1123a3fb7e 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -1,13 +1,14 @@ #include "scheduler/ResourceFactory.h" #include "scheduler/ResourceMgr.h" #include "scheduler/Scheduler.h" +#include "scheduler/task/TestTask.h" +#include "utils/Log.h" #include using namespace zilliz::milvus::engine; -TEST(normal_test, DISABLED_test1) { - +TEST(normal_test, test1) { // ResourceMgr only compose resources, provide unified event auto res_mgr = std::make_shared(); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); @@ -23,17 +24,35 @@ TEST(normal_test, DISABLED_test1) { res_mgr->Start(); - auto task1 = std::make_shared(); - auto task2 = std::make_shared(); - if (auto observe = disk.lock()) { - observe->task_table().Put(task1); - observe->task_table().Put(task2); - observe->task_table().Put(task1); - observe->task_table().Put(task1); - } - auto scheduler = new Scheduler(res_mgr); scheduler->Start(); - while (true) sleep(1); + auto task1 = std::make_shared(); + auto task2 = std::make_shared(); + auto task3 = std::make_shared(); + auto task4 = std::make_shared(); + if (auto observe = disk.lock()) { + observe->task_table().Put(task1); + observe->task_table().Put(task2); + observe->task_table().Put(task3); + observe->task_table().Put(task4); + std::cout << "disk:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; + } + + sleep(5); + + if (auto observe = disk.lock()) { + std::cout << "disk:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; + } + if (auto observe = cpu.lock()) { + std::cout << "cpu:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; + } + scheduler->Stop(); + res_mgr->Stop(); + + ASSERT_EQ(task1->load_count_, 1); + ASSERT_EQ(task1->exec_count_, 1); } diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index a9ad9ebca5..f48db3af2d 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -45,8 +45,6 @@ protected: invalid_task_ = nullptr; task1_ = std::make_shared(); task2_ = std::make_shared(); - - empty_table_ = TaskTable(); } TaskPtr invalid_task_; From 41f3a2ac2bd9015a1e207363d005dfc591253e93 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 19 Aug 2019 10:47:30 +0800 Subject: [PATCH 20/22] MS-379 Add Dump implementation in Resource Former-commit-id: 0ad824b5d582fc7235a0e29919fdf5a2975bf534 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Scheduler.cpp | 1 - cpp/src/scheduler/resource/CpuResource.cpp | 4 ++ cpp/src/scheduler/resource/CpuResource.h | 7 +++ cpp/src/scheduler/resource/DiskResource.cpp | 4 ++ cpp/src/scheduler/resource/DiskResource.h | 7 +++ cpp/src/scheduler/resource/GpuResource.cpp | 4 ++ cpp/src/scheduler/resource/GpuResource.h | 7 +++ cpp/src/scheduler/resource/Resource.cpp | 5 +++ cpp/src/scheduler/resource/Resource.h | 7 +++ cpp/unittest/scheduler/normal_test.cpp | 47 ++++++++++++++++----- 11 files changed, 83 insertions(+), 11 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 8232bd77c0..4358c5ddb2 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -30,6 +30,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable - MS-378 - Debug and Update normal_test in scheduler unittest +- MS-379 - Add Dump implementation in Resource ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 06dfa669db..191d1957aa 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,7 +4,6 @@ * Proprietary and confidential. ******************************************************************************/ -#include #include "Scheduler.h" #include "Cost.h" #include "action/Action.h" diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 11c7796187..01fca35ee4 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -11,6 +11,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const CpuResource &resource) { + out << resource.Dump(); + return out; +} CpuResource::CpuResource(std::string name) : Resource(std::move(name), ResourceType::CPU) {} diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index be1340e954..c180ea07d7 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -19,6 +19,13 @@ public: explicit CpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index 66cb72b062..8612909e44 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -10,6 +10,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const DiskResource &resource) { + out << resource.Dump(); + return out; +} DiskResource::DiskResource(std::string name) : Resource(std::move(name), ResourceType::DISK, true, false) { diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 39211dbb66..0fb9d625aa 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -18,6 +18,13 @@ public: explicit DiskResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index df6827881c..8606bb7856 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -11,6 +11,10 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const GpuResource &resource) { + out << resource.Dump(); + return out; +} GpuResource::GpuResource(std::string name) : Resource(std::move(name), ResourceType::GPU) {} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 84bf163284..1cb38df34f 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -18,6 +18,13 @@ public: explicit GpuResource(std::string name); + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); + protected: void LoadFile(TaskPtr task) override; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index ed5d57b7dd..298d1b7d9f 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -10,6 +10,11 @@ namespace zilliz { namespace milvus { namespace engine { +std::ostream &operator<<(std::ostream &out, const Resource &resource) { + out << resource.Dump(); + return out; +} + Resource::Resource(std::string name, ResourceType type, bool enable_loader, diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 769661c67b..c32149b46e 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -74,6 +74,13 @@ public: TaskTable & task_table(); + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Resource &resource); + public: /* * wake up loader; diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 1123a3fb7e..4d1fa36de8 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -36,20 +36,47 @@ TEST(normal_test, test1) { observe->task_table().Put(task2); observe->task_table().Put(task3); observe->task_table().Put(task4); - std::cout << "disk:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; } +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; +// } +// } +// } +// } + sleep(5); - if (auto observe = disk.lock()) { - std::cout << "disk:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; - } - if (auto observe = cpu.lock()) { - std::cout << "cpu:" << std::endl; - std::cout << observe->task_table().Dump() << std::endl; - } +// if (auto disk_r = disk.lock()) { +// if (auto cpu_r = cpu.lock()) { +// if (auto gpu1_r = gpu1.lock()) { +// if (auto gpu2_r = gpu2.lock()) { +// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; +// std::cout << "cpu:" << std::endl; +// std::cout << cpu_r->task_table().Dump() << std::endl; +// std::cout << "gpu1:" << std::endl; +// std::cout << gpu1_r->task_table().Dump() << std::endl; +// std::cout << "gpu2:" << std::endl; +// std::cout << gpu2_r->task_table().Dump() << std::endl; +// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; +// } +// } +// } +// } scheduler->Stop(); res_mgr->Stop(); From 0d0b8eeb3f5c24b14fe09f51861ef28321b5b8a0 Mon Sep 17 00:00:00 2001 From: "xj.lin" Date: Mon, 19 Aug 2019 11:11:28 +0800 Subject: [PATCH 21/22] MS372 fix unittest Former-commit-id: 78f0c12c86ed19158b982d1f460e4a2497bb0585 --- cpp/src/db/meta/SqliteMetaImpl.cpp | 14 +++++++------- cpp/unittest/server/cache_test.cpp | 8 ++++++++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index 9411a5e82b..9053139e0b 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -606,9 +606,9 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, &TableFileSchema::engine_type_); auto match_tableid = c(&TableFileSchema::table_id_) == table_id; - auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW; - auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX; - auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX; + + std::vector file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX}; + auto match_type = in(&TableFileSchema::file_type_, file_type); TableSchema table_schema; table_schema.table_id_ = table_id; @@ -617,23 +617,23 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, decltype(ConnectorPtr->select(select_columns)) result; if (partition.empty() && ids.empty()) { - auto filter = where(match_tableid and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (partition.empty() && !ids.empty()) { auto match_fileid = in(&TableFileSchema::id_, ids); - auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_fileid and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (!partition.empty() && ids.empty()) { auto match_date = in(&TableFileSchema::date_, partition); - auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_date and match_type); result = ConnectorPtr->select(select_columns, filter); } else if (!partition.empty() && !ids.empty()) { auto match_fileid = in(&TableFileSchema::id_, ids); auto match_date = in(&TableFileSchema::date_, partition); - auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index)); + auto filter = where(match_tableid and match_fileid and match_date and match_type); result = ConnectorPtr->select(select_columns, filter); } diff --git a/cpp/unittest/server/cache_test.cpp b/cpp/unittest/server/cache_test.cpp index df2a869e5c..4f1d1db4ef 100644 --- a/cpp/unittest/server/cache_test.cpp +++ b/cpp/unittest/server/cache_test.cpp @@ -39,6 +39,14 @@ public: } + engine::VecIndexPtr Clone() override { + return zilliz::milvus::engine::VecIndexPtr(); + } + + int64_t GetDeviceId() override { + return 0; + } + engine::IndexType GetType() override { return engine::IndexType::INVALID; } From 842fa507fb4ee7a9c01647ec85f7d79f5194028c Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 19 Aug 2019 11:48:45 +0800 Subject: [PATCH 22/22] MS-380 Update resource loader and executor, work util all finished Former-commit-id: 712a0aceaa4c8d4ebbea40f5d18f524afeb38559 --- cpp/CHANGELOG.md | 1 + .../scheduler/action/PushTaskToNeighbour.cpp | 5 +- cpp/src/scheduler/resource/Resource.cpp | 16 ++- cpp/unittest/scheduler/resource_test.cpp | 98 ++++++++++++++----- 4 files changed, 88 insertions(+), 32 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 4358c5ddb2..e0f3b1de25 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -31,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable - MS-378 - Debug and Update normal_test in scheduler unittest - MS-379 - Add Dump implementation in Resource +- MS-380 - Update resource loader and executor, work util all finished ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index c99f490f11..a9b43b3f05 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -12,7 +12,7 @@ namespace milvus { namespace engine { void -push_task(ResourcePtr &self, ResourcePtr &other) { +push_task(const ResourcePtr &self, const ResourcePtr &other) { auto &self_task_table = self->task_table(); auto &other_task_table = other->task_table(); CacheMgr cache; @@ -31,8 +31,7 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) { if (auto self = res.lock()) { for (auto &neighbour : self->GetNeighbours()) { if (auto n = neighbour.neighbour_node.lock()) { - auto neighbour = std::static_pointer_cast(n); - push_task(self, neighbour); + push_task(self, std::static_pointer_cast(n)); } } } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 298d1b7d9f..2c46a703c6 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -99,8 +99,11 @@ void Resource::loader_function() { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); load_flag_ = false; - auto task_item = pick_task_load(); - if (task_item) { + while (true) { + auto task_item = pick_task_load(); + if (task_item == nullptr) { + break; + } LoadFile(task_item->task); // TODO: wrapper loaded task_item->state = TaskTableItemState::LOADED; @@ -109,6 +112,7 @@ void Resource::loader_function() { subscriber_(std::static_pointer_cast(event)); } } + } } @@ -121,8 +125,11 @@ void Resource::executor_function() { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_flag_ = false; - auto task_item = pick_task_execute(); - if (task_item) { + while (true) { + auto task_item = pick_task_execute(); + if (task_item == nullptr) { + break; + } Process(task_item->task); task_item->state = TaskTableItemState::EXECUTED; if (subscriber_) { @@ -130,6 +137,7 @@ void Resource::executor_function() { subscriber_(std::static_pointer_cast(event)); } } + } } diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index 0395856fea..2f7d58eb57 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -25,12 +25,20 @@ protected: disk_resource_ = ResourceFactory::Create("disk"); cpu_resource_ = ResourceFactory::Create("cpu"); gpu_resource_ = ResourceFactory::Create("gpu"); - flag_ = false; + resources_.push_back(disk_resource_); + resources_.push_back(cpu_resource_); + resources_.push_back(gpu_resource_); - auto subscriber = [&](EventPtr event) { - std::unique_lock lock(mutex_); - if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) { - flag_ = true; + auto subscriber = [&](EventPtr event) { + if (event->Type() == EventType::COPY_COMPLETED) { + std::lock_guard lock(load_mutex_); + ++load_count_; + cv_.notify_one(); + } + + if (event->Type() == EventType::FINISH_TASK) { + std::lock_guard lock(load_mutex_); + ++exec_count_; cv_.notify_one(); } }; @@ -52,42 +60,82 @@ protected: } void - Wait() { - std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return flag_; }); + WaitLoader(uint64_t count) { + std::unique_lock lock(load_mutex_); + cv_.wait(lock, [&] { return load_count_ == count; }); + } + + void + WaitExecutor(uint64_t count) { + std::unique_lock lock(exec_mutex_); + cv_.wait(lock, [&] { return exec_count_ == count; }); } ResourcePtr disk_resource_; ResourcePtr cpu_resource_; ResourcePtr gpu_resource_; - bool flag_; - std::mutex mutex_; + std::vector resources_; + uint64_t load_count_ = 0; + uint64_t exec_count_ = 0; + std::mutex load_mutex_; + std::mutex exec_mutex_; std::condition_variable cv_; }; - TEST_F(ResourceTest, cpu_resource_test) { - auto task = std::make_shared(); - cpu_resource_->task_table().Put(task); + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + cpu_resource_->task_table().Put(task); + } + cpu_resource_->WakeupLoader(); - Wait(); - ASSERT_EQ(task->load_count_, 1); - flag_ = false; + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + cpu_resource_->WakeupExecutor(); - Wait(); - ASSERT_EQ(task->exec_count_, 1); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } } TEST_F(ResourceTest, gpu_resource_test) { - auto task = std::make_shared(); - gpu_resource_->task_table().Put(task); + const uint64_t NUM = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(); + tasks.push_back(task); + gpu_resource_->task_table().Put(task); + } + gpu_resource_->WakeupLoader(); - Wait(); - ASSERT_EQ(task->load_count_, 1); - flag_ = false; + WaitLoader(NUM); +// std::cout << "after WakeupLoader" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + } + gpu_resource_->WakeupExecutor(); - Wait(); - ASSERT_EQ(task->exec_count_, 1); + WaitExecutor(NUM); +// std::cout << "after WakeupExecutor" << std::endl; +// std::cout << cpu_resource_->task_table().Dump(); + + for (uint64_t i = 0; i < NUM; ++i) { + ASSERT_EQ(tasks[i]->exec_count_, 1); + } }