mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
MS-377 Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
Former-commit-id: f121fba66ae395b03ae31c400216b5e9c301f0cf
This commit is contained in:
parent
4ed498893d
commit
07ec15f9cd
@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-374 - Add action definition
|
||||
- MS-375 - Add Dump implementation for Event
|
||||
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
|
||||
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) {
|
||||
resources_.emplace_back(resource);
|
||||
|
||||
size_t index = resources_.size() - 1;
|
||||
resource->RegisterSubscriber([&](EventPtr event) {
|
||||
queue_.emplace(event);
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.notify_one();
|
||||
});
|
||||
resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
|
||||
if (auto observe_a = res1.lock()) {
|
||||
if (auto observe_b = res2.lock()) {
|
||||
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
|
||||
observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ResourceMgr::EventProcess() {
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
if (!running_) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto event = queue_.front();
|
||||
queue_.pop();
|
||||
if (subscriber_) {
|
||||
subscriber_(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ResourceMgr::Start() {
|
||||
@ -74,23 +54,33 @@ ResourceMgr::Start() {
|
||||
for (auto &resource : resources_) {
|
||||
resource->Start();
|
||||
}
|
||||
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
|
||||
|
||||
running_ = true;
|
||||
worker_thread_ = std::thread(&ResourceMgr::event_process, this);
|
||||
}
|
||||
|
||||
void
|
||||
ResourceMgr::Stop() {
|
||||
std::lock_guard<std::mutex> lck(resources_mutex_);
|
||||
|
||||
running_ = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
running_ = false;
|
||||
queue_.push(nullptr);
|
||||
event_cv_.notify_one();
|
||||
}
|
||||
worker_thread_.join();
|
||||
|
||||
std::lock_guard<std::mutex> lck(resources_mutex_);
|
||||
for (auto &resource : resources_) {
|
||||
resource->Stop();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ResourceMgr::PostEvent(const EventPtr &event) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
queue_.emplace(event);
|
||||
event_cv_.notify_one();
|
||||
}
|
||||
|
||||
std::string
|
||||
ResourceMgr::Dump() {
|
||||
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
|
||||
@ -103,6 +93,26 @@ ResourceMgr::Dump() {
|
||||
return str;
|
||||
}
|
||||
|
||||
void
|
||||
ResourceMgr::event_process() {
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
auto event = queue_.front();
|
||||
if (event == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
|
||||
|
||||
queue_.pop();
|
||||
if (subscriber_) {
|
||||
subscriber_(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include <condition_variable>
|
||||
|
||||
#include "resource/Resource.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -59,6 +60,8 @@ public:
|
||||
void
|
||||
Stop();
|
||||
|
||||
void
|
||||
PostEvent(const EventPtr& event);
|
||||
|
||||
// TODO: add stats interface(low)
|
||||
|
||||
@ -70,7 +73,7 @@ public:
|
||||
|
||||
private:
|
||||
void
|
||||
EventProcess();
|
||||
event_process();
|
||||
|
||||
private:
|
||||
std::queue<EventPtr> queue_;
|
||||
|
||||
@ -4,64 +4,48 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include <iostream>
|
||||
#include "Scheduler.h"
|
||||
#include "Cost.h"
|
||||
#include "action/Action.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
void
|
||||
push_task(ResourcePtr &self, ResourcePtr &other) {
|
||||
auto self_task_table = self->task_table();
|
||||
auto other_task_table = other->task_table();
|
||||
if (!other_task_table.Empty()) {
|
||||
CacheMgr cache;
|
||||
auto indexes = PickToMove(self_task_table, cache, 1);
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index)->task;
|
||||
other_task_table.Put(task);
|
||||
// TODO: mark moved future
|
||||
other->WakeupLoader();
|
||||
other->WakeupExecutor();
|
||||
}
|
||||
}
|
||||
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
|
||||
: running_(false),
|
||||
res_mgr_(std::move(res_mgr)) {
|
||||
if (auto mgr = res_mgr_.lock()) {
|
||||
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
schedule(const ResourceWPtr &res) {
|
||||
if (auto self = res.lock()) {
|
||||
for (auto &nei : self->GetNeighbours()) {
|
||||
if (auto n = nei.neighbour_node.lock()) {
|
||||
auto neighbour = std::static_pointer_cast<Resource>(n);
|
||||
push_task(self, neighbour);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::Start() {
|
||||
running_ = true;
|
||||
worker_thread_ = std::thread(&Scheduler::worker_function, this);
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::Stop() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
running_ = false;
|
||||
event_queue_.push(nullptr);
|
||||
event_cv_.notify_one();
|
||||
}
|
||||
worker_thread_.join();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
|
||||
schedule(event->resource_);
|
||||
Scheduler::PostEvent(const EventPtr &event) {
|
||||
std::lock_guard<std::mutex> lock(event_mutex_);
|
||||
event_queue_.push(event);
|
||||
event_cv_.notify_one();
|
||||
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
|
||||
}
|
||||
|
||||
std::string
|
||||
@ -69,6 +53,82 @@ Scheduler::Dump() {
|
||||
return std::string();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::worker_function() {
|
||||
while (running_) {
|
||||
std::unique_lock<std::mutex> lock(event_mutex_);
|
||||
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
|
||||
auto event = event_queue_.front();
|
||||
if (event == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
|
||||
event_queue_.pop();
|
||||
Process(event);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::Process(const EventPtr &event) {
|
||||
switch (event->Type()) {
|
||||
case EventType::START_UP: {
|
||||
OnStartUp(event);
|
||||
break;
|
||||
}
|
||||
case EventType::COPY_COMPLETED: {
|
||||
OnCopyCompleted(event);
|
||||
break;
|
||||
}
|
||||
case EventType::FINISH_TASK: {
|
||||
OnFinishTask(event);
|
||||
break;
|
||||
}
|
||||
case EventType::TASK_TABLE_UPDATED: {
|
||||
OnTaskTableUpdated(event);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// TODO: logging
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Scheduler::OnStartUp(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
resource->WakeupExecutor();
|
||||
if (resource->Type()== ResourceType::DISK) {
|
||||
Action::PushTaskToNeighbour(event->resource_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
|
||||
// Action::PushTaskToNeighbour(event->resource_);
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
|
||||
#include "resource/Resource.h"
|
||||
#include "ResourceMgr.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -23,20 +24,32 @@ namespace engine {
|
||||
class Scheduler {
|
||||
public:
|
||||
explicit
|
||||
Scheduler(ResourceMgrWPtr res_mgr)
|
||||
: running_(false),
|
||||
res_mgr_(std::move(res_mgr)) {
|
||||
// res_mgr.Register();
|
||||
// res_mgr.Register();
|
||||
// res_mgr.Register();
|
||||
// res_mgr.Register();
|
||||
}
|
||||
Scheduler(ResourceMgrWPtr res_mgr);
|
||||
|
||||
Scheduler(const Scheduler &) = delete;
|
||||
Scheduler(Scheduler &&) = delete;
|
||||
|
||||
/*
|
||||
* Start worker thread;
|
||||
*/
|
||||
void
|
||||
Start() {
|
||||
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
|
||||
}
|
||||
Start();
|
||||
|
||||
/*
|
||||
* Stop worker thread, join it;
|
||||
*/
|
||||
void
|
||||
Stop();
|
||||
|
||||
/*
|
||||
* Post event to scheduler event queue;
|
||||
*/
|
||||
void
|
||||
PostEvent(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Dump as string;
|
||||
*/
|
||||
std::string
|
||||
Dump();
|
||||
|
||||
@ -45,24 +58,37 @@ private:
|
||||
|
||||
/*
|
||||
* Process start up events;
|
||||
*
|
||||
* Actions:
|
||||
* Pull task from neighbours;
|
||||
*/
|
||||
void
|
||||
OnStartUp(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process finish task events;
|
||||
*
|
||||
* Actions:
|
||||
* Pull task from neighbours;
|
||||
*/
|
||||
void
|
||||
OnFinishTask(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process copy completed events;
|
||||
*
|
||||
* Actions:
|
||||
* Mark task source MOVED;
|
||||
* Pull task from neighbours;
|
||||
*/
|
||||
void
|
||||
OnCopyCompleted(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Process task table updated events;
|
||||
* Process task table updated events, which happened on task_table->put;
|
||||
*
|
||||
* Actions:
|
||||
* Push task to neighbours;
|
||||
*/
|
||||
void
|
||||
OnTaskTableUpdated(const EventPtr &event);
|
||||
@ -72,40 +98,13 @@ private:
|
||||
* Dispatch event to event handler;
|
||||
*/
|
||||
void
|
||||
Process(const EventPtr &event) {
|
||||
switch (event->Type()) {
|
||||
case EventType::START_UP: {
|
||||
OnStartUp(event);
|
||||
break;
|
||||
}
|
||||
case EventType::COPY_COMPLETED: {
|
||||
OnCopyCompleted(event);
|
||||
break;
|
||||
}
|
||||
case EventType::FINISH_TASK: {
|
||||
OnFinishTask(event);
|
||||
break;
|
||||
}
|
||||
case EventType::TASK_TABLE_UPDATED: {
|
||||
OnTaskTableUpdated(event);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Process(const EventPtr &event);
|
||||
|
||||
/*
|
||||
* Called by worker_thread_;
|
||||
*/
|
||||
void
|
||||
worker_function() {
|
||||
while (running_) {
|
||||
auto event = event_queue_.front();
|
||||
Process(event);
|
||||
}
|
||||
}
|
||||
worker_function();
|
||||
|
||||
private:
|
||||
bool running_;
|
||||
@ -113,6 +112,8 @@ private:
|
||||
ResourceMgrWPtr res_mgr_;
|
||||
std::queue<EventPtr> event_queue_;
|
||||
std::thread worker_thread_;
|
||||
std::mutex event_mutex_;
|
||||
std::condition_variable event_cv_;
|
||||
};
|
||||
|
||||
using SchedulerPtr = std::shared_ptr<Scheduler>;
|
||||
|
||||
@ -7,6 +7,7 @@
|
||||
#include "TaskTable.h"
|
||||
#include "event/TaskTableUpdatedEvent.h"
|
||||
#include <vector>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -16,7 +17,9 @@ namespace engine {
|
||||
|
||||
void
|
||||
TaskTable::Put(TaskPtr task) {
|
||||
std::lock_guard<std::mutex> lock(id_mutex_);
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
table_.push_back(item);
|
||||
@ -27,8 +30,10 @@ TaskTable::Put(TaskPtr task) {
|
||||
|
||||
void
|
||||
TaskTable::Put(std::vector<TaskPtr> &tasks) {
|
||||
std::lock_guard<std::mutex> lock(id_mutex_);
|
||||
for (auto &task : tasks) {
|
||||
auto item = std::make_shared<TaskTableItem>();
|
||||
item->id = id_++;
|
||||
item->task = std::move(task);
|
||||
item->state = TaskTableItemState::START;
|
||||
table_.push_back(item);
|
||||
@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string
|
||||
ToString(TaskTableItemState state) {
|
||||
switch (state) {
|
||||
case TaskTableItemState::INVALID: return "INVALID";
|
||||
case TaskTableItemState::START: return "START";
|
||||
case TaskTableItemState::LOADING: return "LOADING";
|
||||
case TaskTableItemState::LOADED: return "LOADED";
|
||||
case TaskTableItemState::EXECUTING: return "EXECUTING";
|
||||
case TaskTableItemState::EXECUTED: return "EXECUTED";
|
||||
case TaskTableItemState::MOVING: return "MOVING";
|
||||
case TaskTableItemState::MOVED: return "MOVED";
|
||||
default: return "";
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
TaskTable::Dump() {
|
||||
return std::string();
|
||||
std::stringstream ss;
|
||||
for (auto &item : table_) {
|
||||
ss << "<" << item->id;
|
||||
ss << ", " << ToString(item->state);
|
||||
ss << ">" << std::endl;
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -49,6 +49,9 @@ class TaskTable {
|
||||
public:
|
||||
TaskTable() = default;
|
||||
|
||||
TaskTable(const TaskTable &) = delete;
|
||||
TaskTable(TaskTable &&) = delete;
|
||||
|
||||
inline void
|
||||
RegisterSubscriber(std::function<void(void)> subscriber) {
|
||||
subscriber_ = std::move(subscriber);
|
||||
@ -167,6 +170,8 @@ public:
|
||||
|
||||
private:
|
||||
// TODO: map better ?
|
||||
std::uint64_t id_ = 0;
|
||||
mutable std::mutex id_mutex_;
|
||||
std::deque<TaskTableItemPtr> table_;
|
||||
std::function<void(void)> subscriber_ = nullptr;
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user