MS-380 Update resource loader and executor, work util all finished

Former-commit-id: 712a0aceaa4c8d4ebbea40f5d18f524afeb38559
This commit is contained in:
wxyu 2019-08-19 11:48:45 +08:00
parent 41f3a2ac2b
commit 842fa507fb
4 changed files with 88 additions and 32 deletions

View File

@ -31,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable - MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
- MS-378 - Debug and Update normal_test in scheduler unittest - MS-378 - Debug and Update normal_test in scheduler unittest
- MS-379 - Add Dump implementation in Resource - MS-379 - Add Dump implementation in Resource
- MS-380 - Update resource loader and executor, work util all finished
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr

View File

@ -12,7 +12,7 @@ namespace milvus {
namespace engine { namespace engine {
void void
push_task(ResourcePtr &self, ResourcePtr &other) { push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table(); auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table(); auto &other_task_table = other->task_table();
CacheMgr cache; CacheMgr cache;
@ -31,8 +31,7 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) { if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) { for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) { if (auto n = neighbour.neighbour_node.lock()) {
auto neighbour = std::static_pointer_cast<Resource>(n); push_task(self, std::static_pointer_cast<Resource>(n));
push_task(self, neighbour);
} }
} }
} }

View File

@ -99,8 +99,11 @@ void Resource::loader_function() {
std::unique_lock<std::mutex> lock(load_mutex_); std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; }); load_cv_.wait(lock, [&] { return load_flag_; });
load_flag_ = false; load_flag_ = false;
auto task_item = pick_task_load(); while (true) {
if (task_item) { auto task_item = pick_task_load();
if (task_item == nullptr) {
break;
}
LoadFile(task_item->task); LoadFile(task_item->task);
// TODO: wrapper loaded // TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED; task_item->state = TaskTableItemState::LOADED;
@ -109,6 +112,7 @@ void Resource::loader_function() {
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
} }
} }
} }
} }
@ -121,8 +125,11 @@ void Resource::executor_function() {
std::unique_lock<std::mutex> lock(exec_mutex_); std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_flag_ = false; exec_flag_ = false;
auto task_item = pick_task_execute(); while (true) {
if (task_item) { auto task_item = pick_task_execute();
if (task_item == nullptr) {
break;
}
Process(task_item->task); Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED; task_item->state = TaskTableItemState::EXECUTED;
if (subscriber_) { if (subscriber_) {
@ -130,6 +137,7 @@ void Resource::executor_function() {
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
} }
} }
} }
} }

View File

@ -25,12 +25,20 @@ protected:
disk_resource_ = ResourceFactory::Create("disk"); disk_resource_ = ResourceFactory::Create("disk");
cpu_resource_ = ResourceFactory::Create("cpu"); cpu_resource_ = ResourceFactory::Create("cpu");
gpu_resource_ = ResourceFactory::Create("gpu"); gpu_resource_ = ResourceFactory::Create("gpu");
flag_ = false; resources_.push_back(disk_resource_);
resources_.push_back(cpu_resource_);
resources_.push_back(gpu_resource_);
auto subscriber = [&](EventPtr event) { auto subscriber = [&](EventPtr event) {
std::unique_lock<std::mutex> lock(mutex_); if (event->Type() == EventType::COPY_COMPLETED) {
if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) { std::lock_guard<std::mutex> lock(load_mutex_);
flag_ = true; ++load_count_;
cv_.notify_one();
}
if (event->Type() == EventType::FINISH_TASK) {
std::lock_guard<std::mutex> lock(load_mutex_);
++exec_count_;
cv_.notify_one(); cv_.notify_one();
} }
}; };
@ -52,42 +60,82 @@ protected:
} }
void void
Wait() { WaitLoader(uint64_t count) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(load_mutex_);
cv_.wait(lock, [&] { return flag_; }); cv_.wait(lock, [&] { return load_count_ == count; });
}
void
WaitExecutor(uint64_t count) {
std::unique_lock<std::mutex> lock(exec_mutex_);
cv_.wait(lock, [&] { return exec_count_ == count; });
} }
ResourcePtr disk_resource_; ResourcePtr disk_resource_;
ResourcePtr cpu_resource_; ResourcePtr cpu_resource_;
ResourcePtr gpu_resource_; ResourcePtr gpu_resource_;
bool flag_; std::vector<ResourcePtr> resources_;
std::mutex mutex_; uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
std::mutex load_mutex_;
std::mutex exec_mutex_;
std::condition_variable cv_; std::condition_variable cv_;
}; };
TEST_F(ResourceTest, cpu_resource_test) { TEST_F(ResourceTest, cpu_resource_test) {
auto task = std::make_shared<TestTask>(); const uint64_t NUM = 100;
cpu_resource_->task_table().Put(task); std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
cpu_resource_->WakeupLoader(); cpu_resource_->WakeupLoader();
Wait(); WaitLoader(NUM);
ASSERT_EQ(task->load_count_, 1); // std::cout << "after WakeupLoader" << std::endl;
flag_ = false; // std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
}
cpu_resource_->WakeupExecutor(); cpu_resource_->WakeupExecutor();
Wait(); WaitExecutor(NUM);
ASSERT_EQ(task->exec_count_, 1); // std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
} }
TEST_F(ResourceTest, gpu_resource_test) { TEST_F(ResourceTest, gpu_resource_test) {
auto task = std::make_shared<TestTask>(); const uint64_t NUM = 100;
gpu_resource_->task_table().Put(task); std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
gpu_resource_->WakeupLoader(); gpu_resource_->WakeupLoader();
Wait(); WaitLoader(NUM);
ASSERT_EQ(task->load_count_, 1); // std::cout << "after WakeupLoader" << std::endl;
flag_ = false; // std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
}
gpu_resource_->WakeupExecutor(); gpu_resource_->WakeupExecutor();
Wait(); WaitExecutor(NUM);
ASSERT_EQ(task->exec_count_, 1); // std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
} }