mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
MS-422 Support DeleteTask in Multi-GpuResource case
Former-commit-id: eea952afdf004ba46b9a55b58818ece35ecc9179
This commit is contained in:
parent
5bbc291bb4
commit
b725f8872f
@ -58,6 +58,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-418 - Update server_config.template file, set CPU compute only default
|
||||
- MS-419 - Move index_file_size from IndexParam to TableSchema
|
||||
- MS-421 - Add TaskLabel in scheduler
|
||||
- MS-422 - Support DeleteTask in Multi-GpuResource case
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
@ -54,7 +54,7 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect
|
||||
auto res2 = get_resource_by_name(name2);
|
||||
if (res1 && res2) {
|
||||
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
|
||||
res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
|
||||
// res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -108,12 +108,25 @@ Scheduler::OnFinishTask(const EventPtr &event) {
|
||||
|
||||
void
|
||||
Scheduler::OnCopyCompleted(const EventPtr &event) {
|
||||
auto load_completed_event = std::static_pointer_cast<CopyCompletedEvent>(event);
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupExecutor();
|
||||
if (resource->Type() == ResourceType::DISK) {
|
||||
Action::PushTaskToNeighbour(event->resource_);
|
||||
} else {
|
||||
Action::PushTaskToNeighbourHasExecutor(event->resource_);
|
||||
|
||||
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
|
||||
switch (task_table_type) {
|
||||
case TaskLabelType::DEFAULT: {
|
||||
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
|
||||
Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,23 +14,11 @@ namespace engine {
|
||||
|
||||
class Action {
|
||||
public:
|
||||
/*
|
||||
* Push task to neighbour;
|
||||
*/
|
||||
static void
|
||||
PushTaskToNeighbour(const ResourceWPtr &self);
|
||||
PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self);
|
||||
|
||||
/*
|
||||
* Push task to neighbour that has executor;
|
||||
*/
|
||||
static void
|
||||
PushTaskToNeighbourHasExecutor(const ResourceWPtr &self);
|
||||
|
||||
/*
|
||||
* Pull task From neighbour;
|
||||
*/
|
||||
static void
|
||||
PullTaskFromNeighbour(const ResourceWPtr &self);
|
||||
PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self);
|
||||
};
|
||||
|
||||
|
||||
|
||||
@ -1,24 +0,0 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "Action.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
void
|
||||
Action::PullTaskFromNeighbour(const ResourceWPtr &self) {
|
||||
// TODO: implement
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -13,54 +13,9 @@ namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
void
|
||||
next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
|
||||
it++;
|
||||
if (neighbours.end() == it) {
|
||||
it = neighbours.begin();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this function called with only on tasks, so it will always push task to first neighbour
|
||||
void
|
||||
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
|
||||
CacheMgr cache;
|
||||
auto it = neighbours.begin();
|
||||
if (it == neighbours.end()) return;
|
||||
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
|
||||
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index)->task;
|
||||
// task = task->Clone();
|
||||
(*it)->task_table().Put(task);
|
||||
next(neighbours, it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
push_task_randomly(TaskTable &self_task_table, std::vector<ResourcePtr> &neighbours) {
|
||||
std::random_device rd;
|
||||
std::mt19937 mt(rd());
|
||||
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
|
||||
CacheMgr cache;
|
||||
|
||||
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index)->task;
|
||||
neighbours[dist(mt)]->task_table().Put(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
|
||||
auto self = res.lock();
|
||||
if (not self) return;
|
||||
|
||||
std::list<ResourcePtr> neighbours;
|
||||
std::vector<ResourcePtr>
|
||||
get_neighbours(const ResourcePtr &self) {
|
||||
std::vector<ResourcePtr> neighbours;
|
||||
for (auto &neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
if (not node) continue;
|
||||
@ -68,30 +23,27 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) {
|
||||
auto resource = std::static_pointer_cast<Resource>(node);
|
||||
neighbours.emplace_back(resource);
|
||||
}
|
||||
return neighbours;
|
||||
}
|
||||
|
||||
push_task_round_robin(self->task_table(), neighbours);
|
||||
|
||||
void
|
||||
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
|
||||
const ResourcePtr &self) {
|
||||
auto neighbours = get_neighbours(self);
|
||||
std::random_device rd;
|
||||
std::mt19937 mt(rd());
|
||||
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
|
||||
|
||||
neighbours[dist(mt)]->task_table().Put(task);
|
||||
}
|
||||
|
||||
void
|
||||
Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
|
||||
auto self = res.lock();
|
||||
if (not self) return;
|
||||
|
||||
std::list<ResourcePtr> l_neighbours;
|
||||
std::vector<ResourcePtr> v_neighbours;
|
||||
for (auto &neighbour_node : self->GetNeighbours()) {
|
||||
auto node = neighbour_node.neighbour_node.lock();
|
||||
if (not node) continue;
|
||||
|
||||
auto resource = std::static_pointer_cast<Resource>(node);
|
||||
if (resource->HasExecutor()) {
|
||||
l_neighbours.push_back(resource);
|
||||
v_neighbours.push_back(resource);
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
|
||||
auto neighbours = get_neighbours(self);
|
||||
for (auto &neighbour : neighbours) {
|
||||
neighbour->task_table().Put(task);
|
||||
}
|
||||
|
||||
// push_task_round_robin(self->task_table(), l_neighbours);
|
||||
push_task_randomly(self->task_table(), v_neighbours);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user