mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-01 00:15:30 +08:00
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-357 Add minimum schedule function See merge request megasearch/milvus!367 Former-commit-id: 981542c442629b4428e31ade75b4bb6a1a9e5de1
This commit is contained in:
commit
8cc143239f
@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-350 - Remove knowhere submodule
|
||||
- MS-354 - Add task class and interface in scheduler
|
||||
- MS-355 - Add copy interface in ExcutionEngine
|
||||
- MS-357 - Add minimum schedule function
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
@ -12,7 +12,7 @@ namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
std::vector<uint64_t>
|
||||
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {
|
||||
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) {
|
||||
std::vector<uint64_t> indexes;
|
||||
return indexes;
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ namespace engine {
|
||||
* call from scheduler;
|
||||
*/
|
||||
std::vector<uint64_t>
|
||||
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit);
|
||||
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit);
|
||||
|
||||
|
||||
/*
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
******************************************************************************/
|
||||
|
||||
#include "Scheduler.h"
|
||||
#include "Cost.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -12,33 +13,55 @@ namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
void
|
||||
StartUpEvent::Process() {
|
||||
push_task(ResourcePtr &self, ResourcePtr &other) {
|
||||
auto self_task_table = self->task_table();
|
||||
auto other_task_table = other->task_table();
|
||||
if (!other_task_table.Empty()) {
|
||||
CacheMgr cache;
|
||||
auto indexes = PickToMove(self_task_table, cache, 1);
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index).task;
|
||||
other_task_table.Put(task);
|
||||
// TODO: mark moved future
|
||||
other->WakeupLoader();
|
||||
other->WakeupExecutor();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
schedule(const ResourceWPtr &res) {
|
||||
if (auto self = res.lock()) {
|
||||
for (auto &nei : self->GetNeighbours()) {
|
||||
if (auto n = nei.neighbour_node.lock()) {
|
||||
auto neighbour = std::static_pointer_cast<Resource>(n);
|
||||
push_task(self, neighbour);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StartUpEvent::Process() {
|
||||
schedule(resource_);
|
||||
}
|
||||
|
||||
void
|
||||
FinishTaskEvent::Process() {
|
||||
// for (nei : res->neighbours) {
|
||||
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
|
||||
// res->task_table()->PutTasks(tasks);
|
||||
// }
|
||||
// res->WakeUpExec();
|
||||
schedule(resource_);
|
||||
}
|
||||
|
||||
void
|
||||
CopyCompletedEvent::Process() {
|
||||
|
||||
schedule(resource_);
|
||||
}
|
||||
|
||||
void
|
||||
TaskTableUpdatedEvent::Process() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Scheduler::Start() {
|
||||
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
|
||||
schedule(resource_);
|
||||
}
|
||||
|
||||
std::string
|
||||
@ -46,14 +69,6 @@ Scheduler::Dump() {
|
||||
return std::string();
|
||||
}
|
||||
|
||||
void
|
||||
Scheduler::worker_function() {
|
||||
while (running_) {
|
||||
auto event = event_queue_.front();
|
||||
event->Process();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,7 +27,7 @@ public:
|
||||
virtual void
|
||||
Process() = 0;
|
||||
|
||||
private:
|
||||
protected:
|
||||
ResourceWPtr resource_;
|
||||
};
|
||||
|
||||
@ -86,7 +86,9 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
Start();
|
||||
Start() {
|
||||
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
|
||||
}
|
||||
|
||||
public:
|
||||
/******** Events ********/
|
||||
@ -138,7 +140,12 @@ private:
|
||||
* Called by worker_thread_;
|
||||
*/
|
||||
void
|
||||
worker_function();
|
||||
worker_function() {
|
||||
while (running_) {
|
||||
auto event = event_queue_.front();
|
||||
event->Process();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
bool running_;
|
||||
|
||||
@ -75,7 +75,22 @@ public:
|
||||
*/
|
||||
void
|
||||
Clear();
|
||||
|
||||
|
||||
/*
|
||||
* Return true if task table empty, otherwise false;
|
||||
*/
|
||||
inline bool
|
||||
Empty() {
|
||||
return table_.empty();
|
||||
}
|
||||
|
||||
/*
|
||||
* Return size of task table;
|
||||
*/
|
||||
inline size_t
|
||||
Size() {
|
||||
return table_.size();
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user