From 5e504b343540ca549d21f9506bfcc0e0edaec817 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 17:27:49 +0800 Subject: [PATCH 1/6] rename functions tasktable, make it accessing likes standard structure Former-commit-id: c0ba41635e710e0807af0fe07d0b6a266f60d044 --- core/src/scheduler/TaskTable.cpp | 5 - core/src/scheduler/TaskTable.h | 57 +++++------ core/src/scheduler/resource/Resource.cpp | 4 +- core/unittest/scheduler/test_scheduler.cpp | 2 +- core/unittest/scheduler/test_tasktable.cpp | 112 ++++++++++----------- 5 files changed, 84 insertions(+), 96 deletions(-) diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index e35c7cd255..bd3dd466a9 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -291,11 +291,6 @@ TaskTable::Put(std::vector& tasks) { } } -TaskTableItemPtr -TaskTable::Get(uint64_t index) { - return table_[index]; -} - size_t TaskTable::TaskToExecute() { size_t count = 0; diff --git a/core/src/scheduler/TaskTable.h b/core/src/scheduler/TaskTable.h index 052be66890..898141d028 100644 --- a/core/src/scheduler/TaskTable.h +++ b/core/src/scheduler/TaskTable.h @@ -106,6 +106,11 @@ class TaskTable : public interface::dumpable { TaskTable(const TaskTable&) = delete; TaskTable(TaskTable&&) = delete; + public: + json + Dump() const override; + + public: inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); @@ -124,40 +129,35 @@ class TaskTable : public interface::dumpable { void Put(std::vector& tasks); - /* - * Return task table item reference; - */ - TaskTableItemPtr - Get(uint64_t index); - - inline size_t - Capacity() { - return table_.capacity(); - } - - /* - * Return size of task table; - */ - inline size_t - Size() { - return table_.size(); - } - size_t TaskToExecute(); - public: - const TaskTableItemPtr& operator[](uint64_t index) { - return table_[index]; - } - - public: std::vector PickToLoad(uint64_t limit); std::vector PickToExecute(uint64_t limit); + public: + inline const TaskTableItemPtr& operator[](uint64_t index) { + return table_[index]; + } + + inline const TaskTableItemPtr& + at(uint64_t index) { + return table_[index]; + } + + inline size_t + capacity() { + return table_.capacity(); + } + + inline size_t + size() { + return table_.size(); + } + public: /******** Action ********/ @@ -223,13 +223,6 @@ class TaskTable : public interface::dumpable { return table_[index]->Moved(); } - public: - /* - * Dump; - */ - json - Dump() const override; - private: std::uint64_t id_ = 0; CircleQueue table_; diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 2577617dab..8e10592262 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -132,7 +132,7 @@ Resource::pick_task_load() { for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) - return task_table_.Get(index); + return task_table_.at(index); // else try next } return nullptr; @@ -150,7 +150,7 @@ Resource::pick_task_execute() { } if (task_table_.Execute(index)) { - return task_table_.Get(index); + return task_table_.at(index); } // if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) { // if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last() diff --git a/core/unittest/scheduler/test_scheduler.cpp b/core/unittest/scheduler/test_scheduler.cpp index aebdfa2af2..b418b7c80e 100644 --- a/core/unittest/scheduler/test_scheduler.cpp +++ b/core/unittest/scheduler/test_scheduler.cpp @@ -165,7 +165,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) { } sleep(3); - ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); + ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM); } TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) { diff --git a/core/unittest/scheduler/test_tasktable.cpp b/core/unittest/scheduler/test_tasktable.cpp index 54f872c2fc..601bd2431d 100644 --- a/core/unittest/scheduler/test_tasktable.cpp +++ b/core/unittest/scheduler/test_tasktable.cpp @@ -183,19 +183,19 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) { TEST_F(TaskTableBaseTest, PUT_TASK) { empty_table_.Put(task1_); - ASSERT_EQ(empty_table_.Get(0)->task, task1_); + ASSERT_EQ(empty_table_.at(0)->task, task1_); } TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) { empty_table_.Put(invalid_task_); - ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_); + ASSERT_EQ(empty_table_.at(0)->task, invalid_task_); } TEST_F(TaskTableBaseTest, PUT_BATCH) { std::vector tasks{task1_, task2_}; empty_table_.Put(tasks); - ASSERT_EQ(empty_table_.Get(0)->task, task1_); - ASSERT_EQ(empty_table_.Get(1)->task, task2_); + ASSERT_EQ(empty_table_.at(0)->task, task1_); + ASSERT_EQ(empty_table_.at(1)->task, task2_); } TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) { @@ -204,14 +204,14 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) { } TEST_F(TaskTableBaseTest, SIZE) { - ASSERT_EQ(empty_table_.Size(), 0); + ASSERT_EQ(empty_table_.size(), 0); empty_table_.Put(task1_); - ASSERT_EQ(empty_table_.Size(), 1); + ASSERT_EQ(empty_table_.size(), 1); } TEST_F(TaskTableBaseTest, OPERATOR) { empty_table_.Put(task1_); - ASSERT_EQ(empty_table_.Get(0), empty_table_[0]); + ASSERT_EQ(empty_table_.at(0), empty_table_[0]); } TEST_F(TaskTableBaseTest, PICK_TO_LOAD) { @@ -224,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) { auto indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) { @@ -237,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) { auto indexes = empty_table_.PickToLoad(3); ASSERT_EQ(indexes.size(), 3); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); - ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3); - ASSERT_EQ(indexes[2] % empty_table_.Capacity(), 4); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); + ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3); + ASSERT_EQ(indexes[2] % empty_table_.capacity(), 4); } TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) { @@ -253,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) { // first pick, non-cache auto indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); // second pick, iterate from 2 // invalid state change empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START; indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) { @@ -274,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) { auto indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) { @@ -289,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) { auto indexes = empty_table_.PickToExecute(3); ASSERT_EQ(indexes.size(), 2); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); - ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); + ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) { @@ -305,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) { // first pick, non-cache auto indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); // second pick, iterate from 2 // invalid state change empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START; indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2); } /************ TaskTableAdvanceTest ************/ @@ -328,14 +328,14 @@ class TaskTableAdvanceTest : public ::testing::Test { table1_.Put(task); } - table1_.Get(0)->state = milvus::scheduler::TaskTableItemState::INVALID; - table1_.Get(1)->state = milvus::scheduler::TaskTableItemState::START; - table1_.Get(2)->state = milvus::scheduler::TaskTableItemState::LOADING; - table1_.Get(3)->state = milvus::scheduler::TaskTableItemState::LOADED; - table1_.Get(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING; - table1_.Get(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED; - table1_.Get(6)->state = milvus::scheduler::TaskTableItemState::MOVING; - table1_.Get(7)->state = milvus::scheduler::TaskTableItemState::MOVED; + table1_.at(0)->state = milvus::scheduler::TaskTableItemState::INVALID; + table1_.at(1)->state = milvus::scheduler::TaskTableItemState::START; + table1_.at(2)->state = milvus::scheduler::TaskTableItemState::LOADING; + table1_.at(3)->state = milvus::scheduler::TaskTableItemState::LOADED; + table1_.at(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING; + table1_.at(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED; + table1_.at(6)->state = milvus::scheduler::TaskTableItemState::MOVING; + table1_.at(7)->state = milvus::scheduler::TaskTableItemState::MOVED; } milvus::scheduler::TaskTable table1_; @@ -343,114 +343,114 @@ class TaskTableAdvanceTest : public ::testing::Test { TEST_F(TaskTableAdvanceTest, LOAD) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Load(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::START) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADING); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADING); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } TEST_F(TaskTableAdvanceTest, LOADED) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Loaded(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADING) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADED); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADED); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } TEST_F(TaskTableAdvanceTest, EXECUTE) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Execute(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } TEST_F(TaskTableAdvanceTest, EXECUTED) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Executed(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::EXECUTING) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } TEST_F(TaskTableAdvanceTest, MOVE) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Move(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVING); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVING); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } TEST_F(TaskTableAdvanceTest, MOVED) { std::vector before_state; - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { before_state.push_back(table1_[i]->state); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { table1_.Moved(i); } - for (size_t i = 0; i < table1_.Size(); ++i) { + for (size_t i = 0; i < table1_.size(); ++i) { if (before_state[i] == milvus::scheduler::TaskTableItemState::MOVING) { - ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVED); + ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVED); } else { - ASSERT_EQ(table1_.Get(i)->state, before_state[i]); + ASSERT_EQ(table1_.at(i)->state, before_state[i]); } } } From 53b3b60db2ed87a8c558ae893608fab0ffc18578 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 19:19:42 +0800 Subject: [PATCH 2/6] Using shared_ptr instead of weak_ptr to avoid performance loss Former-commit-id: 250cb7200b6eefdd9cbb9fd631379d59aca2f368 --- CHANGELOG.md | 1 + core/src/scheduler/Algorithm.cpp | 2 +- core/src/scheduler/Scheduler.cpp | 64 +++++++++---------- core/src/scheduler/Scheduler.h | 6 +- core/src/scheduler/action/Action.h | 5 +- .../scheduler/action/PushTaskToNeighbour.cpp | 22 +++---- core/src/scheduler/event/Event.h | 4 +- core/src/scheduler/event/FinishTaskEvent.h | 2 +- core/src/scheduler/event/LoadCompletedEvent.h | 2 +- core/src/scheduler/event/StartUpEvent.h | 2 +- .../scheduler/event/TaskTableUpdatedEvent.h | 2 +- core/src/scheduler/resource/Node.cpp | 4 +- core/src/scheduler/resource/Node.h | 8 ++- core/unittest/scheduler/test_event.cpp | 8 +-- core/unittest/scheduler/test_node.cpp | 16 ++--- 15 files changed, 75 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bcb3f5b70f..00402ea15f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#80 - Print version information into log during server start - \#82 - Move easyloggingpp into "external" directory - \#92 - Speed up CMake build process +- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss ## Feature - \#115 - Using new structure for tasktable diff --git a/core/src/scheduler/Algorithm.cpp b/core/src/scheduler/Algorithm.cpp index b2156b3f97..fb1742e6e1 100644 --- a/core/src/scheduler/Algorithm.cpp +++ b/core/src/scheduler/Algorithm.cpp @@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP auto cur_neighbours = cur_node->GetNeighbours(); for (auto& neighbour : cur_neighbours) { - auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node.lock()); + auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node); dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] = neighbour.connection.transport_cost(); } diff --git a/core/src/scheduler/Scheduler.cpp b/core/src/scheduler/Scheduler.cpp index fef5cc1a95..cba847c25e 100644 --- a/core/src/scheduler/Scheduler.cpp +++ b/core/src/scheduler/Scheduler.cpp @@ -26,10 +26,8 @@ namespace milvus { namespace scheduler { -Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { - if (auto mgr = res_mgr_.lock()) { - mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); - } +Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { + res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); event_register_.insert(std::make_pair(static_cast(EventType::START_UP), std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1))); event_register_.insert(std::make_pair(static_cast(EventType::LOAD_COMPLETED), @@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } +Scheduler::~Scheduler() { + res_mgr_ = nullptr; +} + void Scheduler::Start() { running_ = true; @@ -100,51 +102,45 @@ Scheduler::Process(const EventPtr& event) { void Scheduler::OnLoadCompleted(const EventPtr& event) { auto load_completed_event = std::static_pointer_cast(event); - if (auto resource = event->resource_.lock()) { - resource->WakeupExecutor(); - auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); - switch (task_table_type) { - case TaskLabelType::DEFAULT: { - Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event); - break; - } - case TaskLabelType::SPECIFIED_RESOURCE: { - Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event); - break; - } - case TaskLabelType::BROADCAST: { - if (resource->HasExecutor() == false) { - load_completed_event->task_table_item_->Move(); - } - Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); - break; - } - default: { break; } + auto resource = event->resource_; + resource->WakeupExecutor(); + + auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); + switch (task_table_type) { + case TaskLabelType::DEFAULT: { + Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event); + break; } - resource->WakeupLoader(); + case TaskLabelType::SPECIFIED_RESOURCE: { + Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event); + break; + } + case TaskLabelType::BROADCAST: { + if (resource->HasExecutor() == false) { + load_completed_event->task_table_item_->Move(); + } + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + break; + } + default: { break; } } + resource->WakeupLoader(); } void Scheduler::OnStartUp(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } void Scheduler::OnFinishTask(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } void Scheduler::OnTaskTableUpdated(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } } // namespace scheduler diff --git a/core/src/scheduler/Scheduler.h b/core/src/scheduler/Scheduler.h index 8d9ea83794..9e3a864774 100644 --- a/core/src/scheduler/Scheduler.h +++ b/core/src/scheduler/Scheduler.h @@ -34,7 +34,9 @@ namespace scheduler { class Scheduler : public interface::dumpable { public: - explicit Scheduler(ResourceMgrWPtr res_mgr); + explicit Scheduler(ResourceMgrPtr res_mgr); + + ~Scheduler(); Scheduler(const Scheduler&) = delete; Scheduler(Scheduler&&) = delete; @@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable { std::unordered_map> event_register_; - ResourceMgrWPtr res_mgr_; + ResourceMgrPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; std::mutex event_mutex_; diff --git a/core/src/scheduler/action/Action.h b/core/src/scheduler/action/Action.h index 51c788f82f..ff72910055 100644 --- a/core/src/scheduler/action/Action.h +++ b/core/src/scheduler/action/Action.h @@ -37,10 +37,11 @@ class Action { PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); static void - DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr event); + DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, + std::shared_ptr event); static void - SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, + SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event); }; diff --git a/core/src/scheduler/action/PushTaskToNeighbour.cpp b/core/src/scheduler/action/PushTaskToNeighbour.cpp index c64e81dcfa..6f74849eac 100644 --- a/core/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/core/src/scheduler/action/PushTaskToNeighbour.cpp @@ -30,7 +30,7 @@ std::vector get_neighbours(const ResourcePtr& self) { std::vector neighbours; for (auto& neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); + auto node = neighbour_node.neighbour_node; if (not node) continue; @@ -46,7 +46,7 @@ std::vector> get_neighbours_with_connetion(const ResourcePtr& self) { std::vector> neighbours; for (auto& neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); + auto node = neighbour_node.neighbour_node; if (not node) continue; @@ -102,7 +102,7 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { } void -Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, +Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { if (not resource->HasExecutor() && event->task_table_item_->Move()) { auto task = event->task_table_item_->task; @@ -114,11 +114,11 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, if (auto index_engine = search_task->index_engine_) { auto location = index_engine->GetLocation(); - for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) { + for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) { auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); if (index != nullptr) { moved = true; - auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i); + auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i); PushTaskToResource(event->task_table_item_->task, dest_resource); break; } @@ -133,17 +133,17 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, } void -Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, +Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { auto task = event->task_table_item_->task; if (resource->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource - auto compute_resources = res_mgr.lock()->GetComputeResources(); + auto compute_resources = res_mgr->GetComputeResources(); std::vector> paths; std::vector transport_costs; for (auto& res : compute_resources) { std::vector path; - uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); + uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path); transport_costs.push_back(transport_cost); paths.emplace_back(path); } @@ -187,10 +187,10 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu); bool find_gpu_res = false; - if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { + if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { for (uint64_t i = 0; i < compute_resources.size(); ++i) { if (compute_resources[i]->name() == - res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) { find_gpu_res = true; Path task_path(paths[i], paths[i].size() - 1); task->path() = task_path; @@ -208,7 +208,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource->WakeupExecutor(); } else { auto next_res_name = task->path().Next(); - auto next_res = res_mgr.lock()->GetResource(next_res_name); + auto next_res = res_mgr->GetResource(next_res_name); // if (event->task_table_item_->Move()) { // next_res->task_table().Put(task); // } diff --git a/core/src/scheduler/event/Event.h b/core/src/scheduler/event/Event.h index 5b1f37fb99..3c29e02225 100644 --- a/core/src/scheduler/event/Event.h +++ b/core/src/scheduler/event/Event.h @@ -30,7 +30,7 @@ class Resource; class Event { public: - explicit Event(EventType type, std::weak_ptr resource) : type_(type), resource_(std::move(resource)) { + explicit Event(EventType type, std::shared_ptr resource) : type_(type), resource_(std::move(resource)) { } inline EventType @@ -46,7 +46,7 @@ class Event { public: EventType type_; - std::weak_ptr resource_; + std::shared_ptr resource_; }; using EventPtr = std::shared_ptr; diff --git a/core/src/scheduler/event/FinishTaskEvent.h b/core/src/scheduler/event/FinishTaskEvent.h index 1b2d8f9818..afaf02de92 100644 --- a/core/src/scheduler/event/FinishTaskEvent.h +++ b/core/src/scheduler/event/FinishTaskEvent.h @@ -29,7 +29,7 @@ namespace scheduler { class FinishTaskEvent : public Event { public: - FinishTaskEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + FinishTaskEvent(std::shared_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) { } diff --git a/core/src/scheduler/event/LoadCompletedEvent.h b/core/src/scheduler/event/LoadCompletedEvent.h index 5a701e0dfc..0aa3bf79d6 100644 --- a/core/src/scheduler/event/LoadCompletedEvent.h +++ b/core/src/scheduler/event/LoadCompletedEvent.h @@ -29,7 +29,7 @@ namespace scheduler { class LoadCompletedEvent : public Event { public: - LoadCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + LoadCompletedEvent(std::shared_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::LOAD_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) { } diff --git a/core/src/scheduler/event/StartUpEvent.h b/core/src/scheduler/event/StartUpEvent.h index c4abb4e27c..2d8292ea70 100644 --- a/core/src/scheduler/event/StartUpEvent.h +++ b/core/src/scheduler/event/StartUpEvent.h @@ -28,7 +28,7 @@ namespace scheduler { class StartUpEvent : public Event { public: - explicit StartUpEvent(std::weak_ptr resource) : Event(EventType::START_UP, std::move(resource)) { + explicit StartUpEvent(std::shared_ptr resource) : Event(EventType::START_UP, std::move(resource)) { } inline std::string diff --git a/core/src/scheduler/event/TaskTableUpdatedEvent.h b/core/src/scheduler/event/TaskTableUpdatedEvent.h index ed64a42d89..9be27e69b6 100644 --- a/core/src/scheduler/event/TaskTableUpdatedEvent.h +++ b/core/src/scheduler/event/TaskTableUpdatedEvent.h @@ -28,7 +28,7 @@ namespace scheduler { class TaskTableUpdatedEvent : public Event { public: - explicit TaskTableUpdatedEvent(std::weak_ptr resource) + explicit TaskTableUpdatedEvent(std::shared_ptr resource) : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) { } diff --git a/core/src/scheduler/resource/Node.cpp b/core/src/scheduler/resource/Node.cpp index dcf03a321c..bc0e559175 100644 --- a/core/src/scheduler/resource/Node.cpp +++ b/core/src/scheduler/resource/Node.cpp @@ -58,9 +58,7 @@ Node::Dump() const { void Node::AddNeighbour(const NeighbourNodePtr& neighbour_node, Connection& connection) { std::lock_guard lk(mutex_); - if (auto s = neighbour_node.lock()) { - neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection))); - } + neighbours_.emplace(std::make_pair(neighbour_node->id_, Neighbour(neighbour_node, connection))); // else do nothing, consider it.. } diff --git a/core/src/scheduler/resource/Node.h b/core/src/scheduler/resource/Node.h index 4539c8c86a..53323fe6e2 100644 --- a/core/src/scheduler/resource/Node.h +++ b/core/src/scheduler/resource/Node.h @@ -31,10 +31,14 @@ namespace scheduler { class Node; -using NeighbourNodePtr = std::weak_ptr; +using NeighbourNodePtr = std::shared_ptr; struct Neighbour { - Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(nei), connection(conn) { + Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(std::move(nei)), connection(std::move(conn)) { + } + + ~Neighbour() { + neighbour_node = nullptr; } NeighbourNodePtr neighbour_node; diff --git a/core/unittest/scheduler/test_event.cpp b/core/unittest/scheduler/test_event.cpp index 07d51e8557..cf627a5d79 100644 --- a/core/unittest/scheduler/test_event.cpp +++ b/core/unittest/scheduler/test_event.cpp @@ -28,7 +28,7 @@ namespace milvus { namespace scheduler { TEST(EventTest, START_UP_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -36,7 +36,7 @@ TEST(EventTest, START_UP_EVENT) { } TEST(EventTest, LOAD_COMPLETED_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res, nullptr); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -44,7 +44,7 @@ TEST(EventTest, LOAD_COMPLETED_EVENT) { } TEST(EventTest, FINISH_TASK_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res, nullptr); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -53,7 +53,7 @@ TEST(EventTest, FINISH_TASK_EVENT) { TEST(EventTest, TASKTABLE_UPDATED_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; diff --git a/core/unittest/scheduler/test_node.cpp b/core/unittest/scheduler/test_node.cpp index 9b34b73191..d2c93971ac 100644 --- a/core/unittest/scheduler/test_node.cpp +++ b/core/unittest/scheduler/test_node.cpp @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. - -#include "scheduler/resource/Node.h" #include +#include "scheduler/resource/Node.h" namespace { namespace ms = milvus::scheduler; -} // namespace +} // namespace class NodeTest : public ::testing::Test { protected: @@ -73,9 +72,11 @@ TEST_F(NodeTest, GET_NEIGHBOURS) { bool n2 = false, n3 = false; auto node1_neighbours = node1_->GetNeighbours(); ASSERT_EQ(node1_neighbours.size(), 2); - for (auto &n : node1_neighbours) { - if (n.neighbour_node.lock() == node2_) n2 = true; - if (n.neighbour_node.lock() == node3_) n3 = true; + for (auto& n : node1_neighbours) { + if (n.neighbour_node == node2_) + n2 = true; + if (n.neighbour_node == node3_) + n3 = true; } ASSERT_TRUE(n2); ASSERT_TRUE(n3); @@ -84,7 +85,7 @@ TEST_F(NodeTest, GET_NEIGHBOURS) { { auto node2_neighbours = node2_->GetNeighbours(); ASSERT_EQ(node2_neighbours.size(), 1); - ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_); + ASSERT_EQ(node2_neighbours[0].neighbour_node, node1_); } { @@ -100,4 +101,3 @@ TEST_F(NodeTest, DUMP) { std::cout << node2_->Dump(); ASSERT_FALSE(node2_->Dump().empty()); } - From ada0bf86ce2504fffe1edb0ce49282f2074d383c Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 19:23:05 +0800 Subject: [PATCH 3/6] solve conflicts Former-commit-id: 538671361c228898d0f2a81fdfdd7d3087bf0721 --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 064ad9b439..cc2461a9c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,6 @@ Please mark all change in change log and use the ticket from JIRA. - \#96 - Remove .a file in milvus/lib for docker-version - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss ->>>>>>> main/0.5.1 ## Feature - \#115 - Using new structure for tasktable From 2b1de98912a499d8a1d88097e5ce9ea9ca8834c4 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 19:29:44 +0800 Subject: [PATCH 4/6] fix cpplint Former-commit-id: df5bb8526ac0fe0662b10fbfb7daa706900e6758 --- core/src/scheduler/resource/Node.h | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/scheduler/resource/Node.h b/core/src/scheduler/resource/Node.h index 53323fe6e2..177cdd735a 100644 --- a/core/src/scheduler/resource/Node.h +++ b/core/src/scheduler/resource/Node.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "Connection.h" From 7510f1f7a2e3853bd93498dfd3cb2399a7eadb68 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 20:28:36 +0800 Subject: [PATCH 5/6] remove unused code Former-commit-id: 630cb776ec1a736f78241835fbbc8cc95b68deaa --- core/src/scheduler/SchedInst.cpp | 70 -------------------------------- core/src/scheduler/job/Job.cpp | 21 ++++++++++ 2 files changed, 21 insertions(+), 70 deletions(-) create mode 100644 core/src/scheduler/job/Job.cpp diff --git a/core/src/scheduler/SchedInst.cpp b/core/src/scheduler/SchedInst.cpp index f3f293a0f3..8474e93c1f 100644 --- a/core/src/scheduler/SchedInst.cpp +++ b/core/src/scheduler/SchedInst.cpp @@ -82,79 +82,9 @@ load_simple_config() { } } -void -load_advance_config() { - // try { - // server::ConfigNode &config = server::Config::GetInstance().GetConfig(server::CONFIG_RESOURCE); - // - // if (config.GetChildren().empty()) throw "resource_config null exception"; - // - // auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); - // - // if (resources.empty()) throw "Children of resource_config null exception"; - // - // for (auto &resource : resources) { - // auto &resname = resource.first; - // auto &resconf = resource.second; - // auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE); - //// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY); - // auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID); - //// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER); - // auto enable_loader = true; - // auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR); - // auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY); - // auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY); - // auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM); - // - // auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname, - // type, - // device_id, - // enable_loader, - // enable_executor)); - // - // if (res.lock()->type() == ResourceType::GPU) { - // auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300); - // auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300); - // auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2); - // pinned_memory = 1024 * 1024 * pinned_memory; - // temp_memory = 1024 * 1024 * temp_memory; - // knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id, - // pinned_memory, - // temp_memory, - // resource_num); - // } - // } - // - // knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); - // - // auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren(); - // if (connections.empty()) throw "connections config null exception"; - // for (auto &conn : connections) { - // auto &connect_name = conn.first; - // auto &connect_conf = conn.second; - // auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS); - // auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS); - // - // std::string delimiter = "==="; - // std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter)); - // std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3, - // connect_endpoint.length()); - // - // auto connection = Connection(connect_name, connect_speed); - // ResMgrInst::GetInstance()->Connect(left, right, connection); - // } - // } catch (const char *msg) { - // SERVER_LOG_ERROR << msg; - // // TODO(wxyu): throw exception instead - // exit(-1); - //// throw std::exception(); - // } -} - void StartSchedulerService() { load_simple_config(); - // load_advance_config(); ResMgrInst::GetInstance()->Start(); SchedInst::GetInstance()->Start(); JobMgrInst::GetInstance()->Start(); diff --git a/core/src/scheduler/job/Job.cpp b/core/src/scheduler/job/Job.cpp new file mode 100644 index 0000000000..954ea11f1b --- /dev/null +++ b/core/src/scheduler/job/Job.cpp @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +// Created by wxyu on 2019/10/28. +// + From de2bb68daa557c53bb4ad01f7561bd72222de2bd Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 28 Oct 2019 20:34:26 +0800 Subject: [PATCH 6/6] Add unique id for Job Former-commit-id: 1865dbd859f345a3febc3ad76682f928678e59f5 --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 6 ++--- core/src/scheduler/ResourceMgr.h | 1 - core/src/scheduler/job/BuildIndexJob.cpp | 6 +++-- core/src/scheduler/job/BuildIndexJob.h | 2 +- core/src/scheduler/job/DeleteJob.cpp | 6 +++-- core/src/scheduler/job/DeleteJob.h | 2 +- core/src/scheduler/job/Job.cpp | 28 +++++++++++++++++++++--- core/src/scheduler/job/Job.h | 8 ++++--- core/src/scheduler/job/SearchJob.cpp | 6 +++-- core/src/scheduler/job/SearchJob.h | 2 +- 11 files changed, 49 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc2461a9c2..785b7c89ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#92 - Speed up CMake build process - \#96 - Remove .a file in milvus/lib for docker-version - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss +- \#122 - Add unique id for Job ## Feature - \#115 - Using new structure for tasktable diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 324d304e2a..6995de3d14 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -136,7 +136,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); - scheduler::DeleteJobPtr job = std::make_shared(0, table_id, meta_ptr_, nres); + scheduler::DeleteJobPtr job = std::make_shared(table_id, meta_ptr_, nres); scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitAndDelete(); } else { @@ -439,7 +439,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi // step 1: get files to search ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); - scheduler::SearchJobPtr job = std::make_shared(0, k, nq, nprobe, vectors); + scheduler::SearchJobPtr job = std::make_shared(k, nq, nprobe, vectors); for (auto& file : files) { scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); job->AddIndexFile(file_ptr); @@ -754,7 +754,7 @@ DBImpl::BackgroundBuildIndex() { Status status; if (!to_index_files.empty()) { - scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); + scheduler::BuildIndexJobPtr job = std::make_shared(meta_ptr_, options_); // step 2: put build index task to scheduler for (auto& file : to_index_files) { diff --git a/core/src/scheduler/ResourceMgr.h b/core/src/scheduler/ResourceMgr.h index 4d2361fb3d..31a1063e5d 100644 --- a/core/src/scheduler/ResourceMgr.h +++ b/core/src/scheduler/ResourceMgr.h @@ -75,7 +75,6 @@ class ResourceMgr : public interface::dumpable { return gpu_resources_; } - // TODO(wxyu): why return shared pointer inline std::vector GetAllResources() { return resources_; diff --git a/core/src/scheduler/job/BuildIndexJob.cpp b/core/src/scheduler/job/BuildIndexJob.cpp index 39c08b6b51..4c4c3b5054 100644 --- a/core/src/scheduler/job/BuildIndexJob.cpp +++ b/core/src/scheduler/job/BuildIndexJob.cpp @@ -23,8 +23,8 @@ namespace milvus { namespace scheduler { -BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options) - : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { +BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options) + : Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { } bool @@ -59,6 +59,8 @@ BuildIndexJob::Dump() const { json ret{ {"number_of_to_index_file", to_index_files_.size()}, }; + auto base = Job::Dump(); + ret.insert(base.begin(), base.end()); return ret; } diff --git a/core/src/scheduler/job/BuildIndexJob.h b/core/src/scheduler/job/BuildIndexJob.h index e3450ee048..9dba5854b6 100644 --- a/core/src/scheduler/job/BuildIndexJob.h +++ b/core/src/scheduler/job/BuildIndexJob.h @@ -41,7 +41,7 @@ using Id2ToTableFileMap = std::unordered_map; class BuildIndexJob : public Job { public: - explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options); + explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options); public: bool diff --git a/core/src/scheduler/job/DeleteJob.cpp b/core/src/scheduler/job/DeleteJob.cpp index 04a9557177..f2131ffb5b 100644 --- a/core/src/scheduler/job/DeleteJob.cpp +++ b/core/src/scheduler/job/DeleteJob.cpp @@ -22,8 +22,8 @@ namespace milvus { namespace scheduler { -DeleteJob::DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource) - : Job(id, JobType::DELETE), +DeleteJob::DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource) + : Job(JobType::DELETE), table_id_(std::move(table_id)), meta_ptr_(std::move(meta_ptr)), num_resource_(num_resource) { @@ -52,6 +52,8 @@ DeleteJob::Dump() const { {"number_of_resource", num_resource_}, {"number_of_done", done_resource}, }; + auto base = Job::Dump(); + ret.insert(base.begin(), base.end()); return ret; } diff --git a/core/src/scheduler/job/DeleteJob.h b/core/src/scheduler/job/DeleteJob.h index 93e5aa40cc..a20d67d45a 100644 --- a/core/src/scheduler/job/DeleteJob.h +++ b/core/src/scheduler/job/DeleteJob.h @@ -35,7 +35,7 @@ namespace scheduler { class DeleteJob : public Job { public: - DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource); + DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource); public: void diff --git a/core/src/scheduler/job/Job.cpp b/core/src/scheduler/job/Job.cpp index 954ea11f1b..1199fe17a6 100644 --- a/core/src/scheduler/job/Job.cpp +++ b/core/src/scheduler/job/Job.cpp @@ -15,7 +15,29 @@ // specific language governing permissions and limitations // under the License. -// -// Created by wxyu on 2019/10/28. -// +#include "Job.h" +namespace milvus { +namespace scheduler { + +namespace { +std::mutex unique_job_mutex; +uint64_t unique_job_id = 0; +} // namespace + +Job::Job(JobType type) : type_(type) { + std::lock_guard lock(unique_job_mutex); + id_ = unique_job_id++; +} + +json +Job::Dump() const { + json ret{ + {"id", id_}, + {"type", type_}, + }; + return ret; +} + +} // namespace scheduler +} // namespace milvus diff --git a/core/src/scheduler/job/Job.h b/core/src/scheduler/job/Job.h index 709db8cffc..949164a8d0 100644 --- a/core/src/scheduler/job/Job.h +++ b/core/src/scheduler/job/Job.h @@ -53,12 +53,14 @@ class Job : public interface::dumpable { return type_; } + json + Dump() const override; + protected: - Job(JobId id, JobType type) : id_(id), type_(type) { - } + explicit Job(JobType type); private: - JobId id_; + JobId id_ = 0; JobType type_; }; diff --git a/core/src/scheduler/job/SearchJob.cpp b/core/src/scheduler/job/SearchJob.cpp index 1143e33add..47c825c122 100644 --- a/core/src/scheduler/job/SearchJob.cpp +++ b/core/src/scheduler/job/SearchJob.cpp @@ -21,8 +21,8 @@ namespace milvus { namespace scheduler { -SearchJob::SearchJob(milvus::scheduler::JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors) - : Job(id, JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) { +SearchJob::SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors) + : Job(JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) { } bool @@ -70,6 +70,8 @@ SearchJob::Dump() const { {"nq", nq_}, {"nprobe", nprobe_}, }; + auto base = Job::Dump(); + ret.insert(base.begin(), base.end()); return ret; } diff --git a/core/src/scheduler/job/SearchJob.h b/core/src/scheduler/job/SearchJob.h index 6c2bd7eea9..1e586090b9 100644 --- a/core/src/scheduler/job/SearchJob.h +++ b/core/src/scheduler/job/SearchJob.h @@ -43,7 +43,7 @@ using ResultSet = std::vector; class SearchJob : public Job { public: - SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors); + SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors); public: bool