mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
MS-531 Disable next version code
Former-commit-id: d4148022dd1c10a8c9da0c2119e359cc37e4f53f
This commit is contained in:
parent
67c057e782
commit
57eabe260f
@ -108,6 +108,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-527 - Update scheduler_test and enable it
|
||||
- MS-528 - Hide some config used future
|
||||
- MS-530 - Add unittest for SearchTask->Load
|
||||
- MS-531 - Disable next version code
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
@ -145,51 +145,52 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
auto self = event->resource_.lock();
|
||||
auto task = load_completed_event->task_table_item_->task;
|
||||
|
||||
// if this resource is disk, assign it to smallest cost resource
|
||||
if (self->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr_.lock()->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto &res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
+ transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
if (self->name() == task->path().Last()) {
|
||||
self->WakeupLoader();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr_.lock()->GetResource(next_res_name);
|
||||
load_completed_event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
}
|
||||
// support next version
|
||||
// auto self = event->resource_.lock();
|
||||
// auto task = load_completed_event->task_table_item_->task;
|
||||
//
|
||||
// // if this resource is disk, assign it to smallest cost resource
|
||||
// if (self->type() == ResourceType::DISK) {
|
||||
// // step 1: calculate shortest path per resource, from disk to compute resource
|
||||
// auto compute_resources = res_mgr_.lock()->GetComputeResources();
|
||||
// std::vector<std::vector<std::string>> paths;
|
||||
// std::vector<uint64_t> transport_costs;
|
||||
// for (auto &res : compute_resources) {
|
||||
// std::vector<std::string> path;
|
||||
// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
|
||||
// transport_costs.push_back(transport_cost);
|
||||
// paths.emplace_back(path);
|
||||
// }
|
||||
//
|
||||
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t min_cost_idx = 0;
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->TotalTasks() == 0) {
|
||||
// min_cost_idx = i;
|
||||
// break;
|
||||
// }
|
||||
// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
// + transport_costs[i];
|
||||
// if (min_cost > cost) {
|
||||
// min_cost = cost;
|
||||
// min_cost_idx = i;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: set path in task
|
||||
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// }
|
||||
//
|
||||
// if (self->name() == task->path().Last()) {
|
||||
// self->WakeupLoader();
|
||||
// } else {
|
||||
// auto next_res_name = task->path().Next();
|
||||
// auto next_res = res_mgr_.lock()->GetResource(next_res_name);
|
||||
// load_completed_event->task_table_item_->Move();
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user