diff --git a/CHANGELOG.md b/CHANGELOG.md index bcb3f5b70f..00402ea15f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#80 - Print version information into log during server start - \#82 - Move easyloggingpp into "external" directory - \#92 - Speed up CMake build process +- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss ## Feature - \#115 - Using new structure for tasktable diff --git a/core/src/scheduler/Algorithm.cpp b/core/src/scheduler/Algorithm.cpp index b2156b3f97..fb1742e6e1 100644 --- a/core/src/scheduler/Algorithm.cpp +++ b/core/src/scheduler/Algorithm.cpp @@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP auto cur_neighbours = cur_node->GetNeighbours(); for (auto& neighbour : cur_neighbours) { - auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node.lock()); + auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node); dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] = neighbour.connection.transport_cost(); } diff --git a/core/src/scheduler/Scheduler.cpp b/core/src/scheduler/Scheduler.cpp index fef5cc1a95..cba847c25e 100644 --- a/core/src/scheduler/Scheduler.cpp +++ b/core/src/scheduler/Scheduler.cpp @@ -26,10 +26,8 @@ namespace milvus { namespace scheduler { -Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { - if (auto mgr = res_mgr_.lock()) { - mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); - } +Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { + res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); event_register_.insert(std::make_pair(static_cast(EventType::START_UP), std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1))); event_register_.insert(std::make_pair(static_cast(EventType::LOAD_COMPLETED), @@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } +Scheduler::~Scheduler() { + res_mgr_ = nullptr; +} + void Scheduler::Start() { running_ = true; @@ -100,51 +102,45 @@ Scheduler::Process(const EventPtr& event) { void Scheduler::OnLoadCompleted(const EventPtr& event) { auto load_completed_event = std::static_pointer_cast(event); - if (auto resource = event->resource_.lock()) { - resource->WakeupExecutor(); - auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); - switch (task_table_type) { - case TaskLabelType::DEFAULT: { - Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event); - break; - } - case TaskLabelType::SPECIFIED_RESOURCE: { - Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event); - break; - } - case TaskLabelType::BROADCAST: { - if (resource->HasExecutor() == false) { - load_completed_event->task_table_item_->Move(); - } - Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); - break; - } - default: { break; } + auto resource = event->resource_; + resource->WakeupExecutor(); + + auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); + switch (task_table_type) { + case TaskLabelType::DEFAULT: { + Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event); + break; } - resource->WakeupLoader(); + case TaskLabelType::SPECIFIED_RESOURCE: { + Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event); + break; + } + case TaskLabelType::BROADCAST: { + if (resource->HasExecutor() == false) { + load_completed_event->task_table_item_->Move(); + } + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + break; + } + default: { break; } } + resource->WakeupLoader(); } void Scheduler::OnStartUp(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } void Scheduler::OnFinishTask(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } void Scheduler::OnTaskTableUpdated(const EventPtr& event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } + event->resource_->WakeupLoader(); } } // namespace scheduler diff --git a/core/src/scheduler/Scheduler.h b/core/src/scheduler/Scheduler.h index 8d9ea83794..9e3a864774 100644 --- a/core/src/scheduler/Scheduler.h +++ b/core/src/scheduler/Scheduler.h @@ -34,7 +34,9 @@ namespace scheduler { class Scheduler : public interface::dumpable { public: - explicit Scheduler(ResourceMgrWPtr res_mgr); + explicit Scheduler(ResourceMgrPtr res_mgr); + + ~Scheduler(); Scheduler(const Scheduler&) = delete; Scheduler(Scheduler&&) = delete; @@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable { std::unordered_map> event_register_; - ResourceMgrWPtr res_mgr_; + ResourceMgrPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; std::mutex event_mutex_; diff --git a/core/src/scheduler/action/Action.h b/core/src/scheduler/action/Action.h index 51c788f82f..ff72910055 100644 --- a/core/src/scheduler/action/Action.h +++ b/core/src/scheduler/action/Action.h @@ -37,10 +37,11 @@ class Action { PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); static void - DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr event); + DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, + std::shared_ptr event); static void - SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, + SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event); }; diff --git a/core/src/scheduler/action/PushTaskToNeighbour.cpp b/core/src/scheduler/action/PushTaskToNeighbour.cpp index c64e81dcfa..6f74849eac 100644 --- a/core/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/core/src/scheduler/action/PushTaskToNeighbour.cpp @@ -30,7 +30,7 @@ std::vector get_neighbours(const ResourcePtr& self) { std::vector neighbours; for (auto& neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); + auto node = neighbour_node.neighbour_node; if (not node) continue; @@ -46,7 +46,7 @@ std::vector> get_neighbours_with_connetion(const ResourcePtr& self) { std::vector> neighbours; for (auto& neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); + auto node = neighbour_node.neighbour_node; if (not node) continue; @@ -102,7 +102,7 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { } void -Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, +Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { if (not resource->HasExecutor() && event->task_table_item_->Move()) { auto task = event->task_table_item_->task; @@ -114,11 +114,11 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, if (auto index_engine = search_task->index_engine_) { auto location = index_engine->GetLocation(); - for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) { + for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) { auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); if (index != nullptr) { moved = true; - auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i); + auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i); PushTaskToResource(event->task_table_item_->task, dest_resource); break; } @@ -133,17 +133,17 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, } void -Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, +Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { auto task = event->task_table_item_->task; if (resource->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource - auto compute_resources = res_mgr.lock()->GetComputeResources(); + auto compute_resources = res_mgr->GetComputeResources(); std::vector> paths; std::vector transport_costs; for (auto& res : compute_resources) { std::vector path; - uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); + uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path); transport_costs.push_back(transport_cost); paths.emplace_back(path); } @@ -187,10 +187,10 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu); bool find_gpu_res = false; - if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { + if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { for (uint64_t i = 0; i < compute_resources.size(); ++i) { if (compute_resources[i]->name() == - res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) { find_gpu_res = true; Path task_path(paths[i], paths[i].size() - 1); task->path() = task_path; @@ -208,7 +208,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource->WakeupExecutor(); } else { auto next_res_name = task->path().Next(); - auto next_res = res_mgr.lock()->GetResource(next_res_name); + auto next_res = res_mgr->GetResource(next_res_name); // if (event->task_table_item_->Move()) { // next_res->task_table().Put(task); // } diff --git a/core/src/scheduler/event/Event.h b/core/src/scheduler/event/Event.h index 5b1f37fb99..3c29e02225 100644 --- a/core/src/scheduler/event/Event.h +++ b/core/src/scheduler/event/Event.h @@ -30,7 +30,7 @@ class Resource; class Event { public: - explicit Event(EventType type, std::weak_ptr resource) : type_(type), resource_(std::move(resource)) { + explicit Event(EventType type, std::shared_ptr resource) : type_(type), resource_(std::move(resource)) { } inline EventType @@ -46,7 +46,7 @@ class Event { public: EventType type_; - std::weak_ptr resource_; + std::shared_ptr resource_; }; using EventPtr = std::shared_ptr; diff --git a/core/src/scheduler/event/FinishTaskEvent.h b/core/src/scheduler/event/FinishTaskEvent.h index 1b2d8f9818..afaf02de92 100644 --- a/core/src/scheduler/event/FinishTaskEvent.h +++ b/core/src/scheduler/event/FinishTaskEvent.h @@ -29,7 +29,7 @@ namespace scheduler { class FinishTaskEvent : public Event { public: - FinishTaskEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + FinishTaskEvent(std::shared_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) { } diff --git a/core/src/scheduler/event/LoadCompletedEvent.h b/core/src/scheduler/event/LoadCompletedEvent.h index 5a701e0dfc..0aa3bf79d6 100644 --- a/core/src/scheduler/event/LoadCompletedEvent.h +++ b/core/src/scheduler/event/LoadCompletedEvent.h @@ -29,7 +29,7 @@ namespace scheduler { class LoadCompletedEvent : public Event { public: - LoadCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) + LoadCompletedEvent(std::shared_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::LOAD_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) { } diff --git a/core/src/scheduler/event/StartUpEvent.h b/core/src/scheduler/event/StartUpEvent.h index c4abb4e27c..2d8292ea70 100644 --- a/core/src/scheduler/event/StartUpEvent.h +++ b/core/src/scheduler/event/StartUpEvent.h @@ -28,7 +28,7 @@ namespace scheduler { class StartUpEvent : public Event { public: - explicit StartUpEvent(std::weak_ptr resource) : Event(EventType::START_UP, std::move(resource)) { + explicit StartUpEvent(std::shared_ptr resource) : Event(EventType::START_UP, std::move(resource)) { } inline std::string diff --git a/core/src/scheduler/event/TaskTableUpdatedEvent.h b/core/src/scheduler/event/TaskTableUpdatedEvent.h index ed64a42d89..9be27e69b6 100644 --- a/core/src/scheduler/event/TaskTableUpdatedEvent.h +++ b/core/src/scheduler/event/TaskTableUpdatedEvent.h @@ -28,7 +28,7 @@ namespace scheduler { class TaskTableUpdatedEvent : public Event { public: - explicit TaskTableUpdatedEvent(std::weak_ptr resource) + explicit TaskTableUpdatedEvent(std::shared_ptr resource) : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) { } diff --git a/core/src/scheduler/resource/Node.cpp b/core/src/scheduler/resource/Node.cpp index dcf03a321c..bc0e559175 100644 --- a/core/src/scheduler/resource/Node.cpp +++ b/core/src/scheduler/resource/Node.cpp @@ -58,9 +58,7 @@ Node::Dump() const { void Node::AddNeighbour(const NeighbourNodePtr& neighbour_node, Connection& connection) { std::lock_guard lk(mutex_); - if (auto s = neighbour_node.lock()) { - neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection))); - } + neighbours_.emplace(std::make_pair(neighbour_node->id_, Neighbour(neighbour_node, connection))); // else do nothing, consider it.. } diff --git a/core/src/scheduler/resource/Node.h b/core/src/scheduler/resource/Node.h index 4539c8c86a..53323fe6e2 100644 --- a/core/src/scheduler/resource/Node.h +++ b/core/src/scheduler/resource/Node.h @@ -31,10 +31,14 @@ namespace scheduler { class Node; -using NeighbourNodePtr = std::weak_ptr; +using NeighbourNodePtr = std::shared_ptr; struct Neighbour { - Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(nei), connection(conn) { + Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(std::move(nei)), connection(std::move(conn)) { + } + + ~Neighbour() { + neighbour_node = nullptr; } NeighbourNodePtr neighbour_node; diff --git a/core/unittest/scheduler/test_event.cpp b/core/unittest/scheduler/test_event.cpp index 07d51e8557..cf627a5d79 100644 --- a/core/unittest/scheduler/test_event.cpp +++ b/core/unittest/scheduler/test_event.cpp @@ -28,7 +28,7 @@ namespace milvus { namespace scheduler { TEST(EventTest, START_UP_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -36,7 +36,7 @@ TEST(EventTest, START_UP_EVENT) { } TEST(EventTest, LOAD_COMPLETED_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res, nullptr); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -44,7 +44,7 @@ TEST(EventTest, LOAD_COMPLETED_EVENT) { } TEST(EventTest, FINISH_TASK_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res, nullptr); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; @@ -53,7 +53,7 @@ TEST(EventTest, FINISH_TASK_EVENT) { TEST(EventTest, TASKTABLE_UPDATED_EVENT) { - ResourceWPtr res(ResourcePtr(nullptr)); + ResourcePtr res(nullptr); auto event = std::make_shared(res); ASSERT_FALSE(event->Dump().empty()); std::cout << *event; diff --git a/core/unittest/scheduler/test_node.cpp b/core/unittest/scheduler/test_node.cpp index 9b34b73191..d2c93971ac 100644 --- a/core/unittest/scheduler/test_node.cpp +++ b/core/unittest/scheduler/test_node.cpp @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. - -#include "scheduler/resource/Node.h" #include +#include "scheduler/resource/Node.h" namespace { namespace ms = milvus::scheduler; -} // namespace +} // namespace class NodeTest : public ::testing::Test { protected: @@ -73,9 +72,11 @@ TEST_F(NodeTest, GET_NEIGHBOURS) { bool n2 = false, n3 = false; auto node1_neighbours = node1_->GetNeighbours(); ASSERT_EQ(node1_neighbours.size(), 2); - for (auto &n : node1_neighbours) { - if (n.neighbour_node.lock() == node2_) n2 = true; - if (n.neighbour_node.lock() == node3_) n3 = true; + for (auto& n : node1_neighbours) { + if (n.neighbour_node == node2_) + n2 = true; + if (n.neighbour_node == node3_) + n3 = true; } ASSERT_TRUE(n2); ASSERT_TRUE(n3); @@ -84,7 +85,7 @@ TEST_F(NodeTest, GET_NEIGHBOURS) { { auto node2_neighbours = node2_->GetNeighbours(); ASSERT_EQ(node2_neighbours.size(), 1); - ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_); + ASSERT_EQ(node2_neighbours[0].neighbour_node, node1_); } { @@ -100,4 +101,3 @@ TEST_F(NodeTest, DUMP) { std::cout << node2_->Dump(); ASSERT_FALSE(node2_->Dump().empty()); } -