From 07ec15f9cd1960643f65afa5331b9ffbf67b7018 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sun, 18 Aug 2019 20:08:15 +0800 Subject: [PATCH] MS-377 Improve process thread trigger in ResourceMgr, Scheduler and TaskTable Former-commit-id: f121fba66ae395b03ae31c400216b5e9c301f0cf --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 64 +++++++------ cpp/src/scheduler/ResourceMgr.h | 5 +- cpp/src/scheduler/Scheduler.cpp | 144 +++++++++++++++++++++--------- cpp/src/scheduler/Scheduler.h | 83 ++++++++--------- cpp/src/scheduler/TaskTable.cpp | 28 +++++- cpp/src/scheduler/TaskTable.h | 5 ++ 7 files changed, 218 insertions(+), 112 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index a30a34563b..69fdd9ac2b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-374 - Add action definition - MS-375 - Add Dump implementation for Event - MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task +- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index a0625c836f..916aaa238a 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) { resources_.emplace_back(resource); size_t index = resources_.size() - 1; - resource->RegisterSubscriber([&](EventPtr event) { - queue_.emplace(event); - std::unique_lock lock(event_mutex_); - event_cv_.notify_one(); - }); + resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1)); return ret; } @@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect if (auto observe_a = res1.lock()) { if (auto observe_b = res2.lock()) { observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + observe_b->AddNeighbour(std::static_pointer_cast(observe_a), connection); } } } -void -ResourceMgr::EventProcess() { - while (running_) { - std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { return !queue_.empty(); }); - - if (!running_) { - break; - } - - auto event = queue_.front(); - queue_.pop(); - if (subscriber_) { - subscriber_(event); - } - } -} void ResourceMgr::Start() { @@ -74,23 +54,33 @@ ResourceMgr::Start() { for (auto &resource : resources_) { resource->Start(); } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - running_ = true; + worker_thread_ = std::thread(&ResourceMgr::event_process, this); } void ResourceMgr::Stop() { - std::lock_guard lck(resources_mutex_); - - running_ = false; + { + std::lock_guard lock(event_mutex_); + running_ = false; + queue_.push(nullptr); + event_cv_.notify_one(); + } worker_thread_.join(); + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { resource->Stop(); } } +void +ResourceMgr::PostEvent(const EventPtr &event) { + std::unique_lock lock(event_mutex_); + queue_.emplace(event); + event_cv_.notify_one(); +} + std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; @@ -103,6 +93,26 @@ ResourceMgr::Dump() { return str; } +void +ResourceMgr::event_process() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); + + auto event = queue_.front(); + if (event == nullptr) { + break; + } + +// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; + + queue_.pop(); + if (subscriber_) { + subscriber_(event); + } + } +} + } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index fc7744cd2b..cb2e631935 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -14,6 +14,7 @@ #include #include "resource/Resource.h" +#include "utils/Log.h" namespace zilliz { @@ -59,6 +60,8 @@ public: void Stop(); + void + PostEvent(const EventPtr& event); // TODO: add stats interface(low) @@ -70,7 +73,7 @@ public: private: void - EventProcess(); + event_process(); private: std::queue queue_; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c9d92ccb57..06dfa669db 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,64 +4,48 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Scheduler.h" #include "Cost.h" +#include "action/Action.h" namespace zilliz { namespace milvus { namespace engine { -void -push_task(ResourcePtr &self, ResourcePtr &other) { - auto self_task_table = self->task_table(); - auto other_task_table = other->task_table(); - if (!other_task_table.Empty()) { - CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future - other->WakeupLoader(); - other->WakeupExecutor(); - } - } +Scheduler::Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { + if (auto mgr = res_mgr_.lock()) { + mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } } -void -schedule(const ResourceWPtr &res) { - if (auto self = res.lock()) { - for (auto &nei : self->GetNeighbours()) { - if (auto n = nei.neighbour_node.lock()) { - auto neighbour = std::static_pointer_cast(n); - push_task(self, neighbour); - } - } +void +Scheduler::Start() { + running_ = true; + worker_thread_ = std::thread(&Scheduler::worker_function, this); +} + +void +Scheduler::Stop() { + { + std::lock_guard lock(event_mutex_); + running_ = false; + event_queue_.push(nullptr); + event_cv_.notify_one(); } + worker_thread_.join(); } void -Scheduler::OnStartUp(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnFinishTask(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnCopyCompleted(const EventPtr &event) { - schedule(event->resource_); -} - -void -Scheduler::OnTaskTableUpdated(const EventPtr &event) { - schedule(event->resource_); +Scheduler::PostEvent(const EventPtr &event) { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + event_cv_.notify_one(); +// SERVER_LOG_DEBUG << "Scheduler post " << *event; } std::string @@ -69,6 +53,82 @@ Scheduler::Dump() { return std::string(); } +void +Scheduler::worker_function() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); + auto event = event_queue_.front(); + if (event == nullptr) { + break; + } + +// SERVER_LOG_DEBUG << "Scheduler process " << *event; + event_queue_.pop(); + Process(event); + } +} + +void +Scheduler::Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + // TODO: logging + break; + } + } +} + + +void +Scheduler::OnStartUp(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + +void +Scheduler::OnFinishTask(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupExecutor(); + } +} + +void +Scheduler::OnCopyCompleted(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + resource->WakeupExecutor(); + if (resource->Type()== ResourceType::DISK) { + Action::PushTaskToNeighbour(event->resource_); + } + } +} + +void +Scheduler::OnTaskTableUpdated(const EventPtr &event) { +// Action::PushTaskToNeighbour(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 5e50826238..012a479a82 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -13,6 +13,7 @@ #include "resource/Resource.h" #include "ResourceMgr.h" +#include "utils/Log.h" namespace zilliz { @@ -23,20 +24,32 @@ namespace engine { class Scheduler { public: explicit - Scheduler(ResourceMgrWPtr res_mgr) - : running_(false), - res_mgr_(std::move(res_mgr)) { -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); - } + Scheduler(ResourceMgrWPtr res_mgr); + Scheduler(const Scheduler &) = delete; + Scheduler(Scheduler &&) = delete; + + /* + * Start worker thread; + */ void - Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); - } + Start(); + /* + * Stop worker thread, join it; + */ + void + Stop(); + + /* + * Post event to scheduler event queue; + */ + void + PostEvent(const EventPtr &event); + + /* + * Dump as string; + */ std::string Dump(); @@ -45,24 +58,37 @@ private: /* * Process start up events; + * + * Actions: + * Pull task from neighbours; */ void OnStartUp(const EventPtr &event); /* * Process finish task events; + * + * Actions: + * Pull task from neighbours; */ void OnFinishTask(const EventPtr &event); /* * Process copy completed events; + * + * Actions: + * Mark task source MOVED; + * Pull task from neighbours; */ void OnCopyCompleted(const EventPtr &event); /* - * Process task table updated events; + * Process task table updated events, which happened on task_table->put; + * + * Actions: + * Push task to neighbours; */ void OnTaskTableUpdated(const EventPtr &event); @@ -72,40 +98,13 @@ private: * Dispatch event to event handler; */ void - Process(const EventPtr &event) { - switch (event->Type()) { - case EventType::START_UP: { - OnStartUp(event); - break; - } - case EventType::COPY_COMPLETED: { - OnCopyCompleted(event); - break; - } - case EventType::FINISH_TASK: { - OnFinishTask(event); - break; - } - case EventType::TASK_TABLE_UPDATED: { - OnTaskTableUpdated(event); - break; - } - default: { - break; - } - } - } + Process(const EventPtr &event); /* * Called by worker_thread_; */ void - worker_function() { - while (running_) { - auto event = event_queue_.front(); - Process(event); - } - } + worker_function(); private: bool running_; @@ -113,6 +112,8 @@ private: ResourceMgrWPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; + std::mutex event_mutex_; + std::condition_variable event_cv_; }; using SchedulerPtr = std::shared_ptr; diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 9287b47033..7ef033c500 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -7,6 +7,7 @@ #include "TaskTable.h" #include "event/TaskTableUpdatedEvent.h" #include +#include namespace zilliz { @@ -16,7 +17,9 @@ namespace engine { void TaskTable::Put(TaskPtr task) { + std::lock_guard lock(id_mutex_); auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; table_.push_back(item); @@ -27,8 +30,10 @@ TaskTable::Put(TaskPtr task) { void TaskTable::Put(std::vector &tasks) { + std::lock_guard lock(id_mutex_); for (auto &task : tasks) { auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; table_.push_back(item); @@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) { return false; } +std::string +ToString(TaskTableItemState state) { + switch (state) { + case TaskTableItemState::INVALID: return "INVALID"; + case TaskTableItemState::START: return "START"; + case TaskTableItemState::LOADING: return "LOADING"; + case TaskTableItemState::LOADED: return "LOADED"; + case TaskTableItemState::EXECUTING: return "EXECUTING"; + case TaskTableItemState::EXECUTED: return "EXECUTED"; + case TaskTableItemState::MOVING: return "MOVING"; + case TaskTableItemState::MOVED: return "MOVED"; + default: return ""; + } +} + std::string TaskTable::Dump() { - return std::string(); + std::stringstream ss; + for (auto &item : table_) { + ss << "<" << item->id; + ss << ", " << ToString(item->state); + ss << ">" << std::endl; + } + return ss.str(); } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 70a2899331..8a482d0579 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -49,6 +49,9 @@ class TaskTable { public: TaskTable() = default; + TaskTable(const TaskTable &) = delete; + TaskTable(TaskTable &&) = delete; + inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); @@ -167,6 +170,8 @@ public: private: // TODO: map better ? + std::uint64_t id_ = 0; + mutable std::mutex id_mutex_; std::deque table_; std::function subscriber_ = nullptr; };