diff --git a/CHANGELOGS.md b/CHANGELOGS.md index c8ee3b39fd..a0c7ed62c5 100644 --- a/CHANGELOGS.md +++ b/CHANGELOGS.md @@ -19,3 +19,4 @@ Please mark all change in change log and use the ticket from JIRA. - MS-202 - Add Milvus Jenkins project email notification - MS-215 - Add Milvus cluster CI/CD groovy file - MS-277 - Update CUDA Version to V10.1 +- MS-336 - Scheduler interface diff --git a/cpp/src/scheduler/CacheMgr.h b/cpp/src/scheduler/CacheMgr.h new file mode 100644 index 0000000000..321cbbb8a4 --- /dev/null +++ b/cpp/src/scheduler/CacheMgr.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +// dummy cache_mgr +class CacheMgr { + +}; + +using CacheMgrPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h new file mode 100644 index 0000000000..dc52eacaa3 --- /dev/null +++ b/cpp/src/scheduler/Cost.h @@ -0,0 +1,46 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include "Task.h" +#include "CacheMgr.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +// TODO: Policy interface +// TODO: collect statistics + +/* + * select tasks to move; + * call from scheduler; + */ +std::vector +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {} + + +/* + * select task to load + * call from resource; + * I DONT SURE NEED THIS; + */ +std::vector +PickToLoad(TaskTable task_table, uint64_t limit) {} + +/* + * select task to execute; + * call from resource; + * I DONT SURE NEED THIS; + */ +std::vector +PickToExecute(TaskTable task_table, uint64_t limit) {} + + +} +} +} diff --git a/cpp/src/scheduler/ResourceFactory.h b/cpp/src/scheduler/ResourceFactory.h new file mode 100644 index 0000000000..f327522975 --- /dev/null +++ b/cpp/src/scheduler/ResourceFactory.h @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + +#include "resource/Resource.h" +#include "resource/CpuResource.h" +#include "resource/GpuResource.h" +#include "resource/DiskResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceFactory { +public: + static std::shared_ptr + Create(const std::string &name, const std::string &alias = "") { + if (name == "disk") { + return std::make_shared(alias); + } else if (name == "cpu") { + return std::make_shared(alias); + } else if (name == "gpu") { + return std::make_shared(alias); + } else { + return nullptr; + } + } +}; + + +} +} +} + diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h new file mode 100644 index 0000000000..e1cc9110bf --- /dev/null +++ b/cpp/src/scheduler/ResourceMgr.h @@ -0,0 +1,141 @@ + +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceMgr { +public: + ResourceMgr() : running_(false) {} + + /******** Management Interface ********/ + + /* + * Add resource into Resource Management; + * Generate functions on events; + * Functions only modify bool variable, like event trigger; + */ + ResourceWPtr + Add(ResourcePtr &&resource) { + ResourceWPtr ret(resource); + resources_.emplace_back(resource); + +// resource->RegisterOnStartUp([] { +// start_up_event_[index] = true; +// }); +// resource.RegisterOnFinishTask([] { +// finish_task_event_[index] = true; +// }); + return ret; + } + + /* + * Create connection between A and B; + */ + void + Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) { + if (auto observe_a = A.lock()) { + if (auto observe_b = B.lock()) { + observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + } + } + } + + /* + * Synchronous start all resource; + * Last, start event process thread; + */ + void + StartAll() { + for (auto &resource : resources_) { + resource->Start(); + } + worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); + } + + // TODO: add stats interface(low) + +public: + /******** Event Register Interface ********/ + + /* + * Register on start up event; + */ + void + RegisterOnStartUp(std::function &func) { + on_start_up_ = func; + } + + /* + * Register on finish one task event; + */ + void + RegisterOnFinishTask(std::function &func) { + on_finish_task_ = func; + } + + /* + * Register on copy task data completed event; + */ + void + RegisterOnCopyCompleted(std::function &func); + + /* + * Register on task table updated event; + */ + void + RegisterOnTaskTableUpdated(std::function &func); + +public: + /******** Utlitity Functions ********/ + + std::string + Dump(); + +private: + void + EventProcess() { + while (running_) { + for (uint64_t i = 0; i < resources_.size(); ++i) { + if (start_up_event_[i]) { + on_start_up_(resources_[i]); + } + } + } + + } + +private: + bool running_; + + std::vector resources_; + std::thread worker_thread_; + + std::vector start_up_event_; + std::vector finish_task_event_; + std::vector copy_completed_event_; + std::vector task_table_updated_event_; + + std::function on_start_up_; + std::function on_finish_task_; + std::function on_copy_completed_; + std::function on_task_table_updated_; +}; + +using ResourceMgrWPtr = std::weak_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h new file mode 100644 index 0000000000..4bb840a68d --- /dev/null +++ b/cpp/src/scheduler/Scheduler.h @@ -0,0 +1,155 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Event { +public: + explicit + Event(ResourceWPtr &resource) + : resource_(resource) {} + +public: + virtual void + Process() = 0; + +private: + ResourceWPtr resource_; +}; + +using EventPtr = std::shared_ptr; + +class StartUpEvent : public Event { +public: + explicit + StartUpEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class FinishTaskEvent : public Event { +public: + explicit + FinishTaskEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override { +// for (nei : res->neighbours) { +// tasks = cost(nei->task_table(), nei->connection, limit = 3) +// res->task_table()->PutTasks(tasks); +// } +// res->WakeUpExec(); + } +}; + +class CopyCompletedEvent : public Event { +public: + explicit + CopyCompletedEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class TaskTableUpdatedEvent : public Event { +public: + explicit + TaskTableUpdatedEvent(ResourceWPtr &resource) + : Event(resource) {} + +public: + void + Process() override; +}; + +class Scheduler { +public: + explicit + Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { +// res_mgr.Register(); +// res_mgr.Register(); +// res_mgr.Register(); +// res_mgr.Register(); + } + + void + Start() {} + + /******** Events ********/ + + /* + * Process start up events; + */ + void + OnStartUp(ResourceWPtr &resource) { + // call from res_mgr, non-blocking, if queue size over limit, exception! + auto event = std::make_shared(resource); + event_queue_.push(event); + } + + /* + * Process finish task events; + */ + void + OnFinishTask(ResourceWPtr); + + /* + * Process copy completed events; + */ + void + OnCopyCompleted(ResourceWPtr); + + /* + * Process task table updated events; + */ + void + OnTaskTableUpdated(ResourceWPtr); + + +public: + std::string + Dump(); + + +private: + void + worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } + } + +private: + bool running_; + + ResourceMgrWPtr res_mgr_; + std::queue event_queue_; + std::thread worker_thread_; +}; + +} +} +} + diff --git a/cpp/src/scheduler/Task.h b/cpp/src/scheduler/Task.h new file mode 100644 index 0000000000..e7f33049e5 --- /dev/null +++ b/cpp/src/scheduler/Task.h @@ -0,0 +1,29 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +// dummy task +class Task { +public: + Task(const std::string &name) {} + + void + Execute() {} +}; + +using TaskPtr = std::shared_ptr; + +} +} +} diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h new file mode 100644 index 0000000000..567aa6214d --- /dev/null +++ b/cpp/src/scheduler/TaskTable.h @@ -0,0 +1,164 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class TaskTableItemState { + INVALID, + START, // idle + LOADING, // loading data from other resource + LOADED, // ready to exec or move + EXECUTING, // executing, locking util executed or failed + EXECUTED, // executed, termination state + MOVING, // moving to another resource, locking util executed or failed + MOVED, // moved, termination state +}; + +struct TaskTableItem { + TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} + + TaskTableItem(const TaskTableItem &src) + : id(src.id), state(src.state), mutex(), priority(src.priority) {} + + uint64_t id; // auto increment from 0; + // TODO: add tag into task + TaskPtr task; // the task; + TaskTableItemState state; // the state; + std::mutex mutex; + + uint8_t priority; // just a number, meaningless; +}; + +class TaskTable { +public: + TaskTable() = default; + + explicit + TaskTable(std::vector &&tasks) {} + + /* + * Put one task; + */ + void + Put(TaskPtr task) {} + + /* + * Put tasks back of task table; + * Called by DBImpl; + */ + void + Put(std::vector &tasks) {} + + /* + * Return task table item reference; + */ + TaskTableItem & + Get(uint64_t index) {} + + /* + * TODO + * Remove sequence task which is DONE or MOVED from front; + * Called by ? + */ + void + Clear() { + // find first task is NOT (done or moved), erase from begin to it; +// auto iterator = table_.begin(); +// while (iterator->state == TaskTableItemState::EXECUTED or +// iterator->state == TaskTableItemState::MOVED) +// iterator++; +// table_.erase(table_.begin(), iterator); + } + + +public: + + /******** Action ********/ + /* + * Move a task; + * Set state moving; + * Called by scheduler; + */ + + // TODO: bool to Status + bool + Move(uint64_t index) { + auto &task = table_[index]; + + std::lock_guard lock(task.mutex); + if (task.state == TaskTableItemState::START) { + task.state = TaskTableItemState::LOADING; + return true; + } + return false; + } + + /* + * Move task finished; + * Set state moved; + * Called by scheduler; + */ + bool + Moved(uint64_t index) {} + + /* + * Load a task; + * Set state loading; + * Called by loader; + */ + bool + Load(uint64_t index) {} + + /* + * Load task finished; + * Set state loaded; + * Called by loader; + */ + bool + Loaded(uint64_t index) {} + + /* + * Execute a task; + * Set state executing; + * Called by executor; + */ + bool + Execute(uint64_t index) {} + + /* + * Execute task finished; + * Set state executed; + * Called by executor; + */ + bool + Executed(uint64_t index) {} + +public: + /* + * Dump; + */ + std::string + Dump(); + +private: + // TODO: map better ? + std::deque table_; +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h new file mode 100644 index 0000000000..0f1088e7fe --- /dev/null +++ b/cpp/src/scheduler/resource/Connection.h @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Connection { +public: + Connection(std::string name, double speed) + : name_(std::move(name)), speed_(speed) {} + + const std::string & + get_name() const { + return name_; + } + + const double + get_speed() const { + return speed_; + } + +public: + std::string + Dump() const { + std::stringstream ss; + ss << ""; + return ss.str(); + } + +private: + std::string name_; + double speed_; +}; + + +} +} +} diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h new file mode 100644 index 0000000000..995615d1ab --- /dev/null +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class CpuResource : public Resource { +public: + explicit + CpuResource(std::string name) + : Resource(std::move(name), ResourceType::CPU) {} + +protected: + void + LoadFile(TaskPtr task) override { +// if (src.type == DISK) { +// fd = open(filename); +// content = fd.read(); +// close(fd); +// } else if (src.type == CPU) { +// memcpy(src, dest, len); +// } else if (src.type == GPU) { +// cudaMemcpyD2H(src, dest); +// } else { +// // unknown type, exception +// } + } + + void + Process(TaskPtr task) override { + task->Execute(); + } +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h new file mode 100644 index 0000000000..b4ff32e75e --- /dev/null +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -0,0 +1,21 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class DiskResource : public Resource { +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h new file mode 100644 index 0000000000..be91950b78 --- /dev/null +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -0,0 +1,21 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + + +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class GpuResource : public Resource { +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h new file mode 100644 index 0000000000..a32c14ec51 --- /dev/null +++ b/cpp/src/scheduler/resource/Node.h @@ -0,0 +1,55 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include + +#include "../TaskTable.h" +#include "Connection.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Node; + +using NeighbourNodePtr = std::weak_ptr; + +struct Neighbour { + NeighbourNodePtr neighbour_node; + Connection connection; +}; + +class Node { +public: + void + AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { + Neighbour neighbour{.neighbour_node = neighbour_node, .connection = connection}; + neighbours_.push_back(neighbour); + } + + void + DelNeighbour(NeighbourNodePtr &neighbour_ptr); + + bool + IsNeighbour(NeighbourNodePtr &neighbour_ptr); + + std::vector + GetNeighbours(); + +public: + std::string + Dump(); + +private: + std::vector neighbours_; +}; + +} +} +} diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h new file mode 100644 index 0000000000..5b7bb13a80 --- /dev/null +++ b/cpp/src/scheduler/resource/Resource.h @@ -0,0 +1,229 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "../TaskTable.h" +#include "../Task.h" +#include "../Cost.h" +#include "Node.h" +#include "Connection.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +enum class ResourceType { + DISK = 0, + CPU = 1, + GPU = 2 +}; + +class Resource : public Node { +public: + void + Start() { + loader_thread_ = std::thread(&Resource::loader_function, this); + executor_thread_ = std::thread(&Resource::executor_function, this); + } + + void + Stop() { + running_ = false; + WakeupLoader(); + WakeupExecutor(); + } + + TaskTable & + task_table() { + return task_table_; + } + +public: + /* + * wake up executor; + */ + void + WakeupExecutor() { + exec_cv_.notify_one(); + } + + /* + * wake up loader; + */ + void + WakeupLoader() { + load_cv_.notify_one(); + } + +public: + /* + * Event function MUST be a short function, never blocking; + */ + + /* + * Register on start up event; + */ + void + RegisterOnStartUp(std::function func); + + /* + * Register on finish one task event; + */ + void + RegisterOnFinishTask(std::function func); + + /* + * Register on copy task data completed event; + */ + void + RegisterOnCopyCompleted(std::function func); + + /* + * Register on task table updated event; + */ + void + RegisterOnTaskTableUpdated(std::function func); + +protected: + Resource(std::string name, ResourceType type) + : name_(std::move(name)), + type_(type), + on_start_up_(nullptr), + on_finish_task_(nullptr), + on_copy_completed_(nullptr), + on_task_table_updated_(nullptr), + running_(false), + load_flag_(false), + exec_flag_(false) { + } + + // TODO: SearchContextPtr to TaskPtr + /* + * Implementation by inherit class; + * Blocking function; + */ + virtual void + LoadFile(TaskPtr task) = 0; + + /* + * Implementation by inherit class; + * Blocking function; + */ + virtual void + Process(TaskPtr task) = 0; + +private: + /* + * These function should move to cost.h ??? + * COST.H ??? + */ + + /* + * Pick one task to load; + * Order by start time; + */ + TaskPtr + pick_task_load() { + auto tasks = PickToLoad(task_table_, 3); + for (uint64_t i = 0; i < tasks.size(); ++i) { + // try to set one task loading, then return + if (task_table_.Load(i)) + return task_table_.Get(i).task; + // else try next + } + return nullptr; + } + + /* + * Pick one task to execute; + * Pick by start time and priority; + */ + TaskPtr + pick_task_execute() { + auto tasks = PickToExecute(task_table_, 3); + for (uint64_t i = 0; i < tasks.size(); ++i) { + // try to set one task executing, then return + if (task_table_.Execute(i)) + return task_table_.Get(i).task; + // else try next + } + return nullptr; + } + +private: + /* + * Only called by load thread; + */ + void + loader_function() { + while (running_) { + std::unique_lock lock(load_mutex_); + load_cv_.wait(lock, [&] { return load_flag_; }); + auto task = pick_task_load(); + if (task) { + LoadFile(task); + on_copy_completed_(); + } + } + + } + + /* + * Only called by worker thread; + */ + void + executor_function() { + on_start_up_(); + while (running_) { + std::unique_lock lock(exec_mutex_); + exec_cv_.wait(lock, [&] { return exec_flag_; }); + auto task = pick_task_execute(); + if (task) { + Process(task); + on_finish_task_(); + } + } + } + + +private: + std::string name_; + ResourceType type_; + + TaskTable task_table_; + + std::function on_start_up_; + std::function on_finish_task_; + std::function on_copy_completed_; + std::function on_task_table_updated_; + + bool running_; + std::thread loader_thread_; + std::thread executor_thread_; + + bool load_flag_; + bool exec_flag_; + std::mutex load_mutex_; + std::mutex exec_mutex_; + std::condition_variable load_cv_; + std::condition_variable exec_cv_; +}; + +using ResourcePtr = std::shared_ptr; +using ResourceWPtr = std::weak_ptr; + +} +} +} + diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index aa96fa6e25..287fe51128 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -42,4 +42,5 @@ add_subdirectory(server) add_subdirectory(db) add_subdirectory(knowhere) add_subdirectory(metrics) +add_subdirectory(scheduler) #add_subdirectory(storage) \ No newline at end of file diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt new file mode 100644 index 0000000000..d62d80f969 --- /dev/null +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -0,0 +1,36 @@ +#------------------------------------------------------------------------------- +# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +# Unauthorized copying of this file, via any medium is strictly prohibited. +# Proprietary and confidential. +#------------------------------------------------------------------------------- +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) +aux_source_directory(./ test_srcs) + +include_directories(/usr/local/cuda/include) +link_directories("/usr/local/cuda/lib64") + +include_directories(/usr/include/mysql) + +#add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY) + +set(scheduler_test_src + ${scheduler_resource_srcs} + ${scheduler_srcs} + ${test_srcs} + ) + +cuda_add_executable(scheduler_test ${scheduler_test_src}) + +set(scheduler_libs + sqlite + boost_system_static + boost_filesystem_static + lz4 + mysqlpp + ) + +target_link_libraries(scheduler_test ${scheduler_libs} ${unittest_libs}) + +install(TARGETS scheduler_test DESTINATION bin) + diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp new file mode 100644 index 0000000000..7318efc9df --- /dev/null +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -0,0 +1,39 @@ +#include "scheduler/ResourceFactory.h" +#include "scheduler/ResourceMgr.h" +#include "scheduler/Scheduler.h" +#include + + +using namespace zilliz::milvus::engine; + +int main() { + + // ResourceMgr only compose resources, provide unified event + auto res_mgr = std::make_shared(); + auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); + auto cpu = res_mgr->Add(ResourceFactory::Create("cpu")); + auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu")); + auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu")); + + auto IO = Connection("IO", 500.0); + auto PCIE = Connection("IO", 11000.0); + res_mgr->Connect(disk, cpu, IO); + res_mgr->Connect(cpu, gpu1, PCIE); + res_mgr->Connect(cpu, gpu2, PCIE); + + res_mgr->StartAll(); + + auto task1 = std::make_shared("123456789"); + auto task2 = std::make_shared("222222222"); + if (auto observe = disk.lock()) { + observe->task_table().Put(task1); + observe->task_table().Put(task2); + observe->task_table().Put(task1); + observe->task_table().Put(task1); + } + + auto scheduler = new Scheduler(res_mgr); + scheduler->Start(); + + while (true) sleep(1); +}